You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2012/03/30 02:44:55 UTC

svn commit: r1307220 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/data/transfer/ src/java/org/apache/hcatalog/data/transfer/impl/ src/java/org/apache/hcatalog/data/transfer/state/ src/test/e2e/hcatalog/udfs/java/org/apa...

Author: gates
Date: Fri Mar 30 02:44:54 2012
New Revision: 1307220

URL: http://svn.apache.org/viewvc?rev=1307220&view=rev
Log:
HCATALOG-287 Add data api to HCatalog

Added:
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
    incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java
    incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java
    incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java
    incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java
Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1307220&r1=1307219&r2=1307220&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Mar 30 02:44:54 2012
@@ -23,6 +23,7 @@ Release 0.4.1 - Unreleased
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-287 Add data api to HCatalog (hashutosh)
 
   IMPROVEMENTS
 

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.data.transfer.impl.HCatInputFormatReader;
+import org.apache.hcatalog.data.transfer.impl.HCatOutputFormatWriter;
+import org.apache.hcatalog.data.transfer.state.DefaultStateProvider;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/** Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} at master and slave nodes.
+ */
+
+public class DataTransferFactory {
+
+	/**
+	 * This should be called once from master node to obtain an instance of {@link HCatReader}
+	 * @param re built using {@link ReadEntity.Builder}
+	 * @param config Any configuration which master node wants to pass to HCatalog
+	 * @return {@link HCatReader}
+	 */
+	public static HCatReader getHCatReader(final ReadEntity re, final Map<String,String> config) {
+		// In future, this may examine ReadEntity and/or config to return appropriate HCatReader
+		return new HCatInputFormatReader(re, config);
+	}
+
+	/**
+	 * This should only be called once from every slave nodes to obtain an instance of {@link HCatReader}
+	 * @param split obtained at master node.
+	 * @param config obtained at master node.
+	 * @return {@link HCatReader}
+	 */
+	public static HCatReader getHCatReader(final InputSplit split, final Configuration config) {
+		// In future, this may examine config to return appropriate HCatReader
+		return getHCatReader(split, config, DefaultStateProvider.get());
+	}
+
+	/**
+	 * This should only be called once from every slave nodes to obtain an instance of {@link HCatReader}
+	 * This should be called if external system has some state to provide to HCatalog
+	 * @param split obtained at master node.
+	 * @param config obtained at master node.
+	 * @param sp 
+	 * @return {@link HCatReader}
+	 */
+	public static HCatReader getHCatReader(final InputSplit split, final Configuration config, StateProvider sp) {
+		// In future, this may examine config to return appropriate HCatReader
+		return new HCatInputFormatReader(split, config, sp);
+	}
+	
+	/** This should be called at master node to obtain an instance of {@link HCatWriter}
+	 * @param we built using {@link WriteEntity.Builder}
+	 * @param config Any configuration which master wants to pass to HCatalog
+	 * @return {@link HCatWriter}
+	 */
+	public static HCatWriter getHCatWriter(final WriteEntity we, final Map<String,String> config) {
+		// In future, this may examine WriteEntity and/or config to return appropriate HCatWriter
+		return new HCatOutputFormatWriter(we, config);
+	}
+
+	/** This should be called at slave nodes to obtain an instance of {@link HCatWriter}
+	 * @param info {@link WriterContext} obtained at master node.
+	 * @return {@link HCatWriter}
+	 */
+	public static HCatWriter getHCatWriter(final WriterContext cntxt) {
+		// In future, this may examine context to return appropriate HCatWriter
+		return getHCatWriter(cntxt, DefaultStateProvider.get());
+	}
+	
+	/** This should be called at slave nodes to obtain an instance of {@link HCatWriter}
+	 * If external system has some mechanism for providing state to HCatalog, this constructor
+	 *  can be used.
+	 * @param info {@link WriterContext} obtained at master node.
+	 * @param sp {@link StateProvider} 
+	 * @return {@link HCatWriter}
+	 */
+	public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) {
+		// In future, this may examine context to return appropriate HCatWriter
+		return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+/** This is a base class for {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. Many fields in them are common,
+ *  so this class contains the common fields.
+ */
+
+abstract class EntityBase {
+
+	String region;
+	String tableName;
+	String dbName;
+	Map<String,String> partitionKVs;
+
+
+
+	/** Common methods for {@link ReadEntity} and {@link WriteEntity}
+	 */
+
+	abstract static class Entity extends EntityBase{
+
+		public String getRegion() {
+			return region;
+		}
+		public String getTableName() {
+			return tableName;
+		}
+		public String getDbName() {
+			return dbName;
+		}
+		public Map<String, String> getPartitionKVs() {
+			return partitionKVs;
+		}
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/** This abstract class is internal to HCatalog and abstracts away the notion of 
+ * underlying system from which reads will be done. 
+ */
+
+public abstract class HCatReader{
+
+	/** This should be called at master node to obtain {@link ReaderContext} which then should be
+	 * serialized and sent to slave nodes.
+	 * @return {@link ReaderContext}
+	 * @throws HCatException
+	 */
+	public abstract ReaderContext prepareRead() throws HCatException;
+	
+	/** This should be called at slave nodes to read {@link HCatRecord}s
+	 * @return {@link Iterator} of {@link HCatRecord}
+	 * @throws HCatException
+	 */
+	public abstract Iterator<HCatRecord> read() throws HCatException;
+	
+	/** This constructor will be invoked by {@link DataTransferFactory} at master node.
+	 * Don't use this constructor. Instead, use {@link DataTransferFactory} 
+ 	 * @param re
+	 * @param config
+	 */
+	protected HCatReader(final ReadEntity re, final Map<String,String> config) {
+		this(config);
+		this.re = re;
+	}
+
+	/** This constructor will be invoked by {@link DataTransferFactory} at slave nodes.
+	 * Don't use this constructor. Instead, use {@link DataTransferFactory} 
+	 * @param re
+	 * @param config
+	 * @param sp
+	 */
+	
+	protected HCatReader(final Configuration config, StateProvider sp) {
+		this.conf = config;
+		this.sp = sp;
+	}
+	
+	protected ReadEntity re;        // This will be null at slaves.
+	protected Configuration conf;   
+	protected ReaderContext info;
+	protected StateProvider sp;   // This will be null at master.
+	
+	private HCatReader(final Map<String,String> config) {
+		Configuration conf = new Configuration();
+		if (null != config) {
+			for(Entry<String, String> kv : config.entrySet()){
+				conf.set(kv.getKey(), kv.getValue());
+			}			
+		}
+		this.conf = conf;	
+	}
+	
+	public Configuration getConf() {
+		if (null == conf) {
+			throw new IllegalStateException("HCatReader is not constructed correctly.");
+		}
+		return conf;
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/** This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external
+ * systems. Don't try to instantiate this directly. Instead, use {@link DataTransferFactory}
+ */
+
+public abstract class HCatWriter {
+
+	protected Configuration conf;
+	protected WriteEntity we;     // This will be null at slave nodes.
+	protected WriterContext info; 
+	protected StateProvider sp;
+	
+	/** External system should invoke this method exactly once from a master node.
+	 * @return {@link WriterContext} This should be serialized and sent to slave nodes to 
+	 * construct HCatWriter there.
+	 * @throws HCatException
+	 */
+	public abstract WriterContext prepareWrite() throws HCatException;
+	
+	/** This method should be used at slave needs to perform writes. 
+	 * @param {@link Iterator} records to be written into HCatalog.
+	 * @throws {@link HCatException}
+	 */
+	public abstract void write(final Iterator<HCatRecord> recordItr) throws HCatException;
+	
+	/** This method should be called at master node. Primary purpose of this is to do metadata commit.
+	 * @throws {@link HCatException}
+	 */
+	public abstract void commit(final WriterContext context) throws HCatException;
+	
+	/** This method should be called at master node. Primary purpose of this is to do cleanups in case 
+	 * of failures.
+	 * @throws {@link HCatException}	 * 
+	 */
+	public abstract void abort(final WriterContext context) throws HCatException;
+	
+	/**
+	 * This constructor will be used at master node
+	 * @param we WriteEntity defines where in storage records should be written to.
+	 * @param config Any configuration which external system wants to communicate to HCatalog 
+	 * for performing writes.
+	 */
+	protected HCatWriter(final WriteEntity we, final Map<String,String> config) {
+		this(config);
+		this.we = we;
+	}
+	
+	/** This constructor will be used at slave nodes.
+	 * @param config
+	 */
+	protected HCatWriter(final Configuration config, final StateProvider sp) {
+		this.conf = config;
+		this.sp = sp;
+	}
+
+	private HCatWriter(final Map<String,String> config) {
+		Configuration conf = new Configuration();
+		if(config != null){
+			// user is providing config, so it could be null.
+			for(Entry<String, String> kv : config.entrySet()){
+				conf.set(kv.getKey(), kv.getValue());
+			}			
+		}
+
+		this.conf = conf;	
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class ReadEntity extends EntityBase.Entity{
+
+	private String filterString;
+
+	/** Don't instantiate {@link ReadEntity} directly. Use, {@link ReadEntity.Builder} instead.
+	 * 
+	 */
+	private ReadEntity() {
+		// Not allowed
+	}
+	
+	private ReadEntity(Builder builder) {
+
+		this.region       = builder.region;
+		this.dbName       = builder.dbName;
+		this.tableName    = builder.tableName;
+		this.partitionKVs = builder.partitionKVs;
+		this.filterString = builder.filterString;
+	}
+
+	public String getFilterString() {
+		return this.filterString;
+	}
+
+	/** This class should be used to build {@link ReadEntity}. It follows builder pattern, letting you build
+	 * your {@link ReadEntity} with whatever level of detail you want.
+	 *
+	 */
+	public static class Builder extends EntityBase {
+
+		private String filterString;
+
+		public Builder withRegion(final String region) {
+			this.region = region;
+			return this;
+		}
+
+
+		public Builder withDatabase(final String dbName) {
+			this.dbName = dbName;
+			return this;
+		}
+
+		public Builder withTable(final String tblName) {
+			this.tableName = tblName;
+			return this;
+		}
+
+		public Builder withPartition(final Map<String,String> partKVs) {
+			this.partitionKVs = partKVs;
+			return this;
+		}
+
+		public Builder withFilter(String filterString) {
+			this.filterString = filterString;
+			return this;
+		}
+
+		public ReadEntity build() {
+			return new ReadEntity(this);
+		}
+	}
+}
\ No newline at end of file

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/** This class will contain information of different {@link InputSplit} obtained at master node
+ * and configuration. This class implements {@link Externalizable} so it can be serialized using
+ * standard java mechanisms.
+ */
+public class ReaderContext implements Externalizable, Configurable {
+
+	private static final long serialVersionUID = -2656468331739574367L;
+	private List<InputSplit> splits;
+	private Configuration conf;
+
+	public ReaderContext() {
+		this.splits = new ArrayList<InputSplit>();
+		this.conf = new Configuration();
+	}
+	
+	public void setInputSplits(final List<InputSplit> splits) {
+		this.splits = splits;
+	}
+	
+	public List<InputSplit> getSplits() {
+		return splits;
+	}
+	
+	@Override
+	public Configuration getConf() {
+		return conf;
+	}
+
+	@Override
+	public void setConf(final Configuration config) {
+		conf = config;
+	}
+
+	@Override
+	public void writeExternal(ObjectOutput out) throws IOException {
+		conf.write(out);
+		out.writeInt(splits.size());
+		for (InputSplit split : splits) {
+			((HCatSplit)split).write(out);
+		}
+	}
+
+	@Override
+	public void readExternal(ObjectInput in) throws IOException,
+			ClassNotFoundException {
+		conf.readFields(in);
+		int numOfSplits = in.readInt();
+		for (int i=0 ; i < numOfSplits; i++) {
+			HCatSplit split = new HCatSplit();
+			split.readFields(in);
+			splits.add(split);
+		}
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class WriteEntity extends EntityBase.Entity{
+
+	/** Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to build 
+	 * {@link WriteEntity}.
+	 */
+
+	private WriteEntity() {
+		// Not allowed.
+	}
+	
+	private WriteEntity(Builder builder) {
+		this.region = builder.region;
+		this.dbName = builder.dbName;
+		this.tableName = builder.tableName;
+		this.partitionKVs = builder.partitionKVs;
+	}
+	
+	/** This class should be used to build {@link WriteEntity}. It follows builder pattern, letting you build
+	 * your {@link WriteEntity} with whatever level of detail you want.
+	 *
+	 */
+	public static class Builder extends EntityBase{
+		
+		public Builder withRegion(final String region) {
+			this.region = region;
+			return this;
+		}
+		
+		public Builder withDatabase(final String dbName) {
+			this.dbName = dbName;
+			return this;
+		}
+		
+		public Builder withTable(final String tblName) {
+			this.tableName = tblName;
+			return this;
+		}
+		
+		public Builder withPartition(final Map<String,String> partKVs) {
+			this.partitionKVs = partKVs;
+			return this;
+		}
+		
+		public WriteEntity build() {
+			return new WriteEntity(this);
+		}
+		
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/** This contains information obtained at master node to help prepare slave nodes for writer. 
+ * This class implements {@link Externalizable} so it can be serialized using
+ * standard java mechanisms. Master should serialize it and make it available to slaves to
+ * prepare for writes.
+ */
+public class WriterContext implements Externalizable, Configurable{
+
+	private static final long serialVersionUID = -5899374262971611840L;
+	private Configuration conf;
+
+	public WriterContext() {
+		conf = new Configuration();
+	}
+	
+	@Override
+	public Configuration getConf() {
+		return conf;
+	}
+	
+	@Override
+	public void setConf(final Configuration config) {
+		this.conf = config;
+	}
+
+	@Override
+	public void writeExternal(ObjectOutput out) throws IOException {
+		conf.write(out);
+	}
+
+	@Override
+	public void readExternal(ObjectInput in) throws IOException,
+			ClassNotFoundException {
+		conf.readFields(in);
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+
+/** This reader reads via {@link HCatInputFormat}
+ * 
+ */
+public class HCatInputFormatReader extends HCatReader{
+
+	private InputSplit split;
+	
+	public HCatInputFormatReader(InputSplit split, Configuration config, StateProvider sp) {
+		super(config, sp);
+		this.split = split;
+	}
+
+	public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+		super(info,config);
+	}
+
+	@Override
+	public ReaderContext prepareRead() throws HCatException {
+
+		try {
+			Job job = new Job(conf);
+			InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString());
+			HCatInputFormat.setInput(job, jobInfo);
+			HCatInputFormat hcif = new HCatInputFormat();
+			ReaderContext cntxt = new ReaderContext();
+			cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null)));
+			cntxt.setConf(job.getConfiguration());
+			return cntxt;
+		} catch (IOException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		} catch (InterruptedException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED,e);
+		}
+	}
+
+	@Override
+	public Iterator<HCatRecord> read() throws HCatException {
+
+		HCatInputFormat inpFmt = new HCatInputFormat();
+		RecordReader<WritableComparable, HCatRecord> rr;
+		try {
+			TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID());
+			rr = inpFmt.createRecordReader(split, cntxt);
+			rr.initialize(split, cntxt);
+		} catch (IOException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		} catch (InterruptedException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		}
+		return new HCatRecordItr(rr);
+	}
+
+
+	private static class HCatRecordItr implements Iterator<HCatRecord>{
+
+		private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+		HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+			curRecReader = rr;
+		}
+
+		@Override
+		public boolean hasNext(){
+			try {
+				boolean retVal = curRecReader.nextKeyValue();
+				if (retVal) {
+					return true;
+				}
+				// if its false, we need to close recordReader.
+				curRecReader.close();
+				return false;
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public HCatRecord next() {
+			try {
+				return curRecReader.getCurrentValue();
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Not allowed");
+		}
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+/** This writer writes via {@link HCatOutputFormat}
+ * 
+ */
+public class HCatOutputFormatWriter extends HCatWriter {
+
+	public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+		super(we, config);
+	}
+
+	public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+		super(config, sp);
+	}
+
+	@Override
+	public WriterContext prepareWrite() throws HCatException {
+		OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs());
+		Job job;
+		try {
+			job = new Job(conf);
+			HCatOutputFormat.setOutput(job, jobInfo);
+			HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+			HCatOutputFormat outFormat = new HCatOutputFormat();
+			outFormat.checkOutputSpecs(job);
+			outFormat.getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).setupJob(job);
+		} catch (IOException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		} catch (InterruptedException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		}
+		WriterContext cntxt = new WriterContext(); 
+		cntxt.setConf(job.getConfiguration());
+		return cntxt;
+	}
+
+	@Override
+	public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+		
+		int id = sp.getId();
+		setVarsInConf(id);
+		HCatOutputFormat outFormat = new HCatOutputFormat();
+		TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(new TaskID(), id));
+		OutputCommitter committer = null;
+		RecordWriter<WritableComparable<?>, HCatRecord> writer;
+		try {
+			committer = outFormat.getOutputCommitter(cntxt);
+			committer.setupTask(cntxt);
+			writer   = outFormat.getRecordWriter(cntxt);
+			while(recordItr.hasNext()){
+				HCatRecord rec = recordItr.next();
+				writer.write(null, rec);
+			}
+			writer.close(cntxt);
+			if(committer.needsTaskCommit(cntxt)){
+				committer.commitTask(cntxt);				
+			}
+		} catch (IOException e) {
+			if(null != committer) {
+				try {
+					committer.abortTask(cntxt);
+				} catch (IOException e1) {
+					throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+				}
+			}
+			throw new HCatException("Failed while writing",e);
+		} catch (InterruptedException e) {
+			if(null != committer) {
+				try {
+					committer.abortTask(cntxt);
+				} catch (IOException e1) {
+					throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+				}
+			}
+			throw new HCatException("Failed while writing", e);
+		}
+	}
+
+	@Override
+	public void commit(WriterContext context) throws HCatException {
+		try {
+			new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+			.commitJob(new JobContext(context.getConf(), null));
+		} catch (IOException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		} catch (InterruptedException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		}
+	}
+
+	@Override
+	public void abort(WriterContext context) throws HCatException {
+		try {
+			new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+			.abortJob(new JobContext(context.getConf(), null),State.FAILED);
+		} catch (IOException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		} catch (InterruptedException e) {
+			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+		}
+	}
+	
+	private void setVarsInConf(int id) {
+		
+		// Following two config keys are required by FileOutputFormat to work correctly.
+		// In usual case of Hadoop, JobTracker will set these before launching tasks.
+		// Since there is no jobtracker here, we set it ourself.
+		conf.setInt("mapred.task.partition", id);
+		conf.set("mapred.task.id", "attempt__0000_r_000000_"+id); 
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+import java.text.NumberFormat;
+import java.util.Random;
+
+
+public class DefaultStateProvider implements StateProvider {
+
+	/** Default implementation. Here, ids are generated randomly.
+	 */
+	@Override
+	public int getId() {
+		
+		NumberFormat numberFormat = NumberFormat.getInstance();
+	    numberFormat.setMinimumIntegerDigits(5);
+	    numberFormat.setGroupingUsed(false);
+		return Integer.parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+	}
+
+	private static StateProvider sp;
+	
+	public static synchronized StateProvider get() {
+		if (null == sp) {
+			sp = new DefaultStateProvider();
+		}
+		return sp;
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/** If external system wants to communicate any state to slaves, they can do so via this interface.
+ * One example of this in case of Map-Reduce is ids assigned by {@link JobTracker} to 
+ * {@link TaskTracker}
+ */
+public interface StateProvider {
+
+	/** This method should return id assigned to slave node.
+	 * @return id
+	 */
+	public int getId();
+}

Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.utils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+
+public class DataReaderMaster {
+
+	public static void main(String[] args) throws FileNotFoundException, IOException {
+
+		// This config contains all the configuration that master node wants to provide
+		// to the HCatalog.
+		Properties externalConfigs = new Properties();
+		externalConfigs.load(new FileReader(args[0]));
+		Map<String,String> config = new HashMap<String, String>();
+		
+		for (Entry<Object, Object> kv : externalConfigs.entrySet()){
+			config.put((String)kv.getKey(), (String)kv.getValue());
+		}
+		
+		// This piece of code runs in master node and gets necessary context.
+		ReaderContext context = runsInMaster(config);
+
+		ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File(args[1])));
+		oos.writeObject(context);
+		oos.flush();
+		oos.close();
+		// Master node will serialize readercontext and will make it available  at slaves.
+	}
+
+	private static ReaderContext runsInMaster(Map<String,String> config) throws HCatException {
+
+		ReadEntity.Builder builder = new ReadEntity.Builder();
+		ReadEntity entity = builder.withTable(config.get("table")).build();
+		HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+		ReaderContext cntxt = reader.prepareRead();
+		return cntxt;
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,44 @@
+package org.apache.hcatalog.utils;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+
+public class DataReaderSlave {
+
+	public static void main(String[] args) throws IOException, ClassNotFoundException {
+		
+		ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File(args[0])));
+		ReaderContext cntxt = (ReaderContext) ois.readObject();
+		ois.close();
+		
+		String[] inpSlitsToRead = args[1].split(",");
+		List<InputSplit> splits = cntxt.getSplits();
+		
+		for (int i = 0; i < inpSlitsToRead.length; i++){
+			InputSplit split = splits.get(Integer.parseInt(inpSlitsToRead[i]));
+			HCatReader reader = DataTransferFactory.getHCatReader(split, cntxt.getConf());
+			Iterator<HCatRecord> itr = reader.read();
+			File f = new File(args[2]+"-"+i);
+			f.delete();
+			BufferedWriter outFile = new BufferedWriter(new FileWriter(f)); 
+			while(itr.hasNext()){
+				String rec = itr.next().toString().replaceFirst("\\s+$", "");
+				System.err.println(rec);
+				outFile.write(rec+"\n");
+			}
+			outFile.close();
+		}
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import javax.imageio.stream.FileImageInputStream;
+
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+
+public class DataWriterMaster {
+
+	public static void main(String[] args) throws FileNotFoundException, IOException, ClassNotFoundException {
+
+		// This config contains all the configuration that master node wants to provide
+		// to the HCatalog.
+		Properties externalConfigs = new Properties();
+		externalConfigs.load(new FileReader(args[0]));
+		Map<String,String> config = new HashMap<String, String>();
+
+		for (Entry<Object, Object> kv : externalConfigs.entrySet()){
+			System.err.println("k: " + kv.getKey() + "\t v: " + kv.getValue());
+			config.put((String)kv.getKey(), (String)kv.getValue());
+		}
+
+		if(args.length == 3 && "commit".equalsIgnoreCase(args[2])){
+			// Then, master commits if everything goes well.
+			ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File(args[1])));
+			WriterContext cntxt = (WriterContext)ois.readObject();
+			commit(config,true, cntxt);		
+			System.exit(0);
+		}
+		// This piece of code runs in master node and gets necessary context.
+		WriterContext cntxt = runsInMaster(config);
+		
+		
+		// Master node will serialize writercontext and will make it available at slaves.
+		File f = new File(args[1]);
+		f.delete();
+		ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(f));
+		oos.writeObject(cntxt);
+		oos.flush();
+		oos.close();
+	}
+
+	private static WriterContext runsInMaster(Map<String, String> config) throws HCatException {
+
+		WriteEntity.Builder builder = new WriteEntity.Builder();
+		WriteEntity entity = builder.withTable(config.get("table")).build();
+		HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+		WriterContext info = writer.prepareWrite();
+		return info;
+	}
+
+	private static void commit(Map<String, String> config, boolean status, WriterContext cntxt) throws HCatException {
+
+		WriteEntity.Builder builder = new WriteEntity.Builder();
+		WriteEntity entity = builder.withTable(config.get("table")).build();
+		HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+		if(status){
+			writer.commit(cntxt);			
+		} else {
+			writer.abort(cntxt);
+		}
+	} 
+}

Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,69 @@
+package org.apache.hcatalog.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriterContext;
+
+public class DataWriterSlave {
+
+	public static void main(String[] args) throws FileNotFoundException, IOException, ClassNotFoundException {
+		
+		ObjectInputStream ois = new ObjectInputStream(new FileInputStream(args[0]));
+		WriterContext cntxt = (WriterContext) ois.readObject();
+		ois.close();
+		
+		HCatWriter writer = DataTransferFactory.getHCatWriter(cntxt);
+		writer.write(new HCatRecordItr(args[1]));
+		
+	}
+	
+	private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+		BufferedReader reader;
+		String curLine;
+		
+		public HCatRecordItr(String fileName) throws FileNotFoundException {
+			reader = new BufferedReader(new FileReader(new File(fileName)));
+		}
+		
+		@Override
+		public boolean hasNext() {
+			try {
+				curLine = reader.readLine();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+			return null == curLine ? false : true;
+		}
+
+		@Override
+		public HCatRecord next() {
+
+			String[] fields = curLine.split("\t");
+			List<Object> data = new ArrayList<Object>(3);
+			data.add(fields[0]);
+			data.add(Integer.parseInt(fields[1]));
+			data.add(Double.parseDouble(fields[2]));
+			return new DefaultHCatRecord(data);
+		}
+
+		@Override
+		public void remove() {
+			// TODO Auto-generated method stub
+			
+		}
+	}
+}

Added: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestReaderWriter {
+
+	@Test
+	public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException {
+
+		HiveConf conf = new HiveConf(getClass());
+		Driver driver = new Driver(conf);
+		SessionState.start(new CliSessionState(conf));
+		driver.run("drop table mytbl");
+		driver.run("create table mytbl (a string, b int)");
+		Iterator<Entry<String,String>> itr = conf.iterator();
+		Map<String,String> map = new HashMap<String, String>();
+		while(itr.hasNext()){
+			Entry<String,String> kv = itr.next();
+			map.put(kv.getKey(), kv.getValue());
+		}
+		
+		WriterContext cntxt = runsInMaster(map);
+		
+		File writeCntxtFile = File.createTempFile("hcat-write", "temp");
+		writeCntxtFile.deleteOnExit();
+		
+		// Serialize context.
+		ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
+		oos.writeObject(cntxt);
+		oos.flush();
+		oos.close();
+		
+		// Now, deserialize it.
+		ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
+		cntxt = (WriterContext) ois.readObject();
+		ois.close();
+		
+		runsInSlave(cntxt);
+		commit(map, true, cntxt);
+		
+		ReaderContext readCntxt = runsInMaster(map, false);
+		
+		File readCntxtFile = File.createTempFile("hcat-read", "temp");
+		readCntxtFile.deleteOnExit();
+		oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
+		oos.writeObject(readCntxt);
+		oos.flush();
+		oos.close();
+		
+		ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
+		readCntxt = (ReaderContext) ois.readObject();
+		ois.close();
+		
+		
+		for(InputSplit split : readCntxt.getSplits()){
+			runsInSlave(split, readCntxt.getConf());			
+		}
+	}
+
+	private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
+
+		WriteEntity.Builder builder = new WriteEntity.Builder();
+		WriteEntity entity = builder.withTable("mytbl").build();
+		HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+		WriterContext info = writer.prepareWrite();
+		return info;
+	}
+	
+	private ReaderContext runsInMaster(Map<String,String> config, boolean bogus) throws HCatException {
+
+		ReadEntity.Builder builder = new ReadEntity.Builder();
+		ReadEntity entity = builder.withTable("mytbl").build();
+		HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+		ReaderContext cntxt = reader.prepareRead();
+		return cntxt;
+	}
+
+	private void runsInSlave(InputSplit split, Configuration config) throws HCatException {
+
+		HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+		Iterator<HCatRecord> itr = reader.read();
+		int i = 1;
+		while(itr.hasNext()){
+			HCatRecord read = itr.next();
+			HCatRecord written = getRecord(i++);
+			// Argh, HCatRecord doesnt implement equals()
+			Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),  written.get(0).equals(read.get(0)));
+			Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),  written.get(1).equals(read.get(1)));
+			Assert.assertEquals(2, read.size());
+		}
+		Assert.assertFalse(itr.hasNext());
+	}
+	
+	private void runsInSlave(WriterContext context) throws HCatException {
+
+		HCatWriter writer = DataTransferFactory.getHCatWriter(context);
+		writer.write(new HCatRecordItr());
+	}
+
+	private void commit(Map<String, String> config, boolean status, WriterContext context) throws IOException {
+
+		WriteEntity.Builder builder = new WriteEntity.Builder();
+		WriteEntity entity = builder.withTable("mytbl").build();
+		HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+		if(status){
+			writer.commit(context);			
+		} else {
+			writer.abort(context);
+		}
+	} 
+
+	private static HCatRecord getRecord(int i) {
+		List<Object> list = new ArrayList<Object>(2);
+		list.add("Row #: " + i);
+		list.add(i);
+		return new DefaultHCatRecord(list);
+	}
+	
+	private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+		int i = 0;
+		@Override
+		public boolean hasNext() {
+			return i++ < 100 ? true : false;
+		}
+
+		@Override
+		public HCatRecord next() {
+			return getRecord(i);
+		}
+
+		@Override
+		public void remove() {
+			throw new RuntimeException();
+		}
+	}
+}