You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2012/06/22 01:40:29 UTC

svn commit: r1352738 [1/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/cli/ src/java/org/apache/hcatalog/data/schema/ src/java/org/apache/hcatalog/data/transfer/ src/java/org/apache/hcatalog/data/transfer/impl/ src/java/org/apache/h...

Author: khorgath
Date: Fri Jun 22 01:40:27 2012
New Revision: 1352738

URL: http://svn.apache.org/viewvc?rev=1352738&view=rev
Log:
HCATALOG-424 Code cleanup of tabs (khorgath)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Jun 22 01:40:27 2012
@@ -26,6 +26,8 @@ Trunk (unreleased changes)
   HCAT-328 HCatLoader should report its input size so pig can estimate the number of reducers (traviscrawford via gates)
 
   IMPROVEMENTS
+  HCAT-424 Code cleanup of tabs (khorgath)
+
   HCAT-331 Update HCatalog to junit 4 (traviscrawford via khorgath)
 
   HCAT-414 More HCat e2e tests (khorgath via gates)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java Fri Jun 22 01:40:27 2012
@@ -56,11 +56,11 @@ public class HCatCli {
   @SuppressWarnings("static-access")
   public static void main(String[] args) {
 
-	  try {
-		  LogUtils.initHiveLog4j();
-	  } catch (LogInitializationException e) {
+    try {
+      LogUtils.initHiveLog4j();
+    } catch (LogInitializationException e) {
 
-	  }
+    }
 
     CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
     ss.in = System.in;
@@ -270,7 +270,7 @@ public class HCatCli {
         ss.err.println("Failed with exception " + e.getClass().getName() + ":"
                 + e.getMessage() + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
             ret = 1;
-	}
+    }
 
     int cret = driver.close();
     if (ret == 0) {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java Fri Jun 22 01:40:27 2012
@@ -153,7 +153,7 @@ public class HCatSchemaUtils {
         case STRING:
             return Type.STRING;
         case BINARY:
-        	return Type.BINARY;
+            return Type.BINARY;
         default:
             throw new TypeNotPresentException(((PrimitiveTypeInfo)basePrimitiveTypeInfo).getTypeName(), null);
         }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java Fri Jun 22 01:40:27 2012
@@ -27,74 +27,109 @@ import org.apache.hcatalog.data.transfer
 import org.apache.hcatalog.data.transfer.state.DefaultStateProvider;
 import org.apache.hcatalog.data.transfer.state.StateProvider;
 
-/** Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} at master and slave nodes.
+/**
+ * Use this factory to get instances of {@link HCatReader} or {@link HCatWriter}
+ * at master and slave nodes.
  */
 
 public class DataTransferFactory {
 
-	/**
-	 * This should be called once from master node to obtain an instance of {@link HCatReader}.
-	 * @param re ReadEntity built using {@link ReadEntity.Builder}
-	 * @param config any configuration which master node wants to pass to HCatalog
-	 * @return {@link HCatReader}
-	 */
-	public static HCatReader getHCatReader(final ReadEntity re, final Map<String,String> config) {
-		// In future, this may examine ReadEntity and/or config to return appropriate HCatReader
-		return new HCatInputFormatReader(re, config);
-	}
-
-	/**
-	 * This should only be called once from every slave node to obtain an instance of {@link HCatReader}.
-	 * @param split input split obtained at master node
-	 * @param config configuration obtained at master node
-	 * @return {@link HCatReader}
-	 */
-	public static HCatReader getHCatReader(final InputSplit split, final Configuration config) {
-		// In future, this may examine config to return appropriate HCatReader
-		return getHCatReader(split, config, DefaultStateProvider.get());
-	}
-
-	/**
-	 * This should only be called once from every slave node to obtain an instance of {@link HCatReader}.
-	 * This should be called if an external system has some state to provide to HCatalog.
-	 * @param split input split obtained at master node
-	 * @param config configuration obtained at master node
-	 * @param sp {@link StateProvider}
-	 * @return {@link HCatReader}
-	 */
-	public static HCatReader getHCatReader(final InputSplit split, final Configuration config, StateProvider sp) {
-		// In future, this may examine config to return appropriate HCatReader
-		return new HCatInputFormatReader(split, config, sp);
-	}
-	
-	/** This should be called at master node to obtain an instance of {@link HCatWriter}.
-	 * @param we WriteEntity built using {@link WriteEntity.Builder}
-	 * @param config any configuration which master wants to pass to HCatalog
-	 * @return {@link HCatWriter}
-	 */
-	public static HCatWriter getHCatWriter(final WriteEntity we, final Map<String,String> config) {
-		// In future, this may examine WriteEntity and/or config to return appropriate HCatWriter
-		return new HCatOutputFormatWriter(we, config);
-	}
-
- 	/** This should be called at slave nodes to obtain an instance of {@link HCatWriter}.
- 	 * @param cntxt {@link WriterContext} obtained at master node
-	 * @return {@link HCatWriter}
-	 */
-	public static HCatWriter getHCatWriter(final WriterContext cntxt) {
-		// In future, this may examine context to return appropriate HCatWriter
-		return getHCatWriter(cntxt, DefaultStateProvider.get());
-	}
-	
- 	/** This should be called at slave nodes to obtain an instance of {@link HCatWriter}.
- 	 *  If an external system has some mechanism for providing state to HCatalog, this constructor
-	 *  can be used.
- 	 * @param cntxt {@link WriterContext} obtained at master node
-	 * @param sp {@link StateProvider} 
-	 * @return {@link HCatWriter}
-	 */
-	public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) {
-		// In future, this may examine context to return appropriate HCatWriter
-		return new HCatOutputFormatWriter(cntxt.getConf(), sp);
-	}
+  /**
+   * This should be called once from master node to obtain an instance of
+   * {@link HCatReader}.
+   * 
+   * @param re
+   *          ReadEntity built using {@link ReadEntity.Builder}
+   * @param config
+   *          any configuration which master node wants to pass to HCatalog
+   * @return {@link HCatReader}
+   */
+  public static HCatReader getHCatReader(final ReadEntity re,
+      final Map<String, String> config) {
+    // In future, this may examine ReadEntity and/or config to return
+    // appropriate HCatReader
+    return new HCatInputFormatReader(re, config);
+  }
+
+  /**
+   * This should only be called once from every slave node to obtain an instance
+   * of {@link HCatReader}.
+   * 
+   * @param split
+   *          input split obtained at master node
+   * @param config
+   *          configuration obtained at master node
+   * @return {@link HCatReader}
+   */
+  public static HCatReader getHCatReader(final InputSplit split,
+      final Configuration config) {
+    // In future, this may examine config to return appropriate HCatReader
+    return getHCatReader(split, config, DefaultStateProvider.get());
+  }
+
+  /**
+   * This should only be called once from every slave node to obtain an instance
+   * of {@link HCatReader}. This should be called if an external system has some
+   * state to provide to HCatalog.
+   * 
+   * @param split
+   *          input split obtained at master node
+   * @param config
+   *          configuration obtained at master node
+   * @param sp
+   *          {@link StateProvider}
+   * @return {@link HCatReader}
+   */
+  public static HCatReader getHCatReader(final InputSplit split,
+      final Configuration config, StateProvider sp) {
+    // In future, this may examine config to return appropriate HCatReader
+    return new HCatInputFormatReader(split, config, sp);
+  }
+
+  /**
+   * This should be called at master node to obtain an instance of
+   * {@link HCatWriter}.
+   * 
+   * @param we
+   *          WriteEntity built using {@link WriteEntity.Builder}
+   * @param config
+   *          any configuration which master wants to pass to HCatalog
+   * @return {@link HCatWriter}
+   */
+  public static HCatWriter getHCatWriter(final WriteEntity we,
+      final Map<String, String> config) {
+    // In future, this may examine WriteEntity and/or config to return
+    // appropriate HCatWriter
+    return new HCatOutputFormatWriter(we, config);
+  }
+
+  /**
+   * This should be called at slave nodes to obtain an instance of
+   * {@link HCatWriter}.
+   * 
+   * @param cntxt
+   *          {@link WriterContext} obtained at master node
+   * @return {@link HCatWriter}
+   */
+  public static HCatWriter getHCatWriter(final WriterContext cntxt) {
+    // In future, this may examine context to return appropriate HCatWriter
+    return getHCatWriter(cntxt, DefaultStateProvider.get());
+  }
+
+  /**
+   * This should be called at slave nodes to obtain an instance of
+   * {@link HCatWriter}. If an external system has some mechanism for providing
+   * state to HCatalog, this constructor can be used.
+   * 
+   * @param cntxt
+   *          {@link WriterContext} obtained at master node
+   * @param sp
+   *          {@link StateProvider}
+   * @return {@link HCatWriter}
+   */
+  public static HCatWriter getHCatWriter(final WriterContext cntxt,
+      final StateProvider sp) {
+    // In future, this may examine context to return appropriate HCatWriter
+    return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java Fri Jun 22 01:40:27 2012
@@ -20,35 +20,40 @@ package org.apache.hcatalog.data.transfe
 
 import java.util.Map;
 
-/** This is a base class for {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. Many fields in them are common,
- *  so this class contains the common fields.
+/**
+ * This is a base class for 
+ * {@link ReadEntity.Builder} / {@link WriteEntity.Builder}.
+ * Many fields in them are common, so this class
+ * contains the common fields.
  */
 
 abstract class EntityBase {
 
-	String region;
-	String tableName;
-	String dbName;
-	Map<String,String> partitionKVs;
-
-
-
-	/** Common methods for {@link ReadEntity} and {@link WriteEntity}
-	 */
-
-	abstract static class Entity extends EntityBase{
-
-		public String getRegion() {
-			return region;
-		}
-		public String getTableName() {
-			return tableName;
-		}
-		public String getDbName() {
-			return dbName;
-		}
-		public Map<String, String> getPartitionKVs() {
-			return partitionKVs;
-		}
-	}
+  String region;
+  String tableName;
+  String dbName;
+  Map<String, String> partitionKVs;
+
+  /**
+   * Common methods for {@link ReadEntity} and {@link WriteEntity}
+   */
+
+  abstract static class Entity extends EntityBase {
+
+    public String getRegion() {
+      return region;
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+    public String getDbName() {
+      return dbName;
+    }
+
+    public Map<String, String> getPartitionKVs() {
+      return partitionKVs;
+    }
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java Fri Jun 22 01:40:27 2012
@@ -27,65 +27,75 @@ import org.apache.hcatalog.common.HCatEx
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.transfer.state.StateProvider;
 
-/** This abstract class is internal to HCatalog and abstracts away the notion of 
- * underlying system from which reads will be done. 
+/**
+ * This abstract class is internal to HCatalog and abstracts away the notion of
+ * underlying system from which reads will be done.
  */
 
-public abstract class HCatReader{
+public abstract class HCatReader {
 
-	/** This should be called at master node to obtain {@link ReaderContext} which then should be
-	 * serialized and sent to slave nodes.
-	 * @return {@link ReaderContext}
-	 * @throws HCatException
-	 */
-	public abstract ReaderContext prepareRead() throws HCatException;
-	
-	/** This should be called at slave nodes to read {@link HCatRecord}s
-	 * @return {@link Iterator} of {@link HCatRecord}
-	 * @throws HCatException
-	 */
-	public abstract Iterator<HCatRecord> read() throws HCatException;
-	
-	/** This constructor will be invoked by {@link DataTransferFactory} at master node.
-	 * Don't use this constructor. Instead, use {@link DataTransferFactory} 
- 	 * @param re
-	 * @param config
-	 */
-	protected HCatReader(final ReadEntity re, final Map<String,String> config) {
-		this(config);
-		this.re = re;
-	}
-
-	/** This constructor will be invoked by {@link DataTransferFactory} at slave nodes.
-	 * Don't use this constructor. Instead, use {@link DataTransferFactory} 
-	 * @param config
-	 * @param sp
-	 */
-	
-	protected HCatReader(final Configuration config, StateProvider sp) {
-		this.conf = config;
-		this.sp = sp;
-	}
-	
-	protected ReadEntity re;        // This will be null at slaves.
-	protected Configuration conf;   
-	protected ReaderContext info;
-	protected StateProvider sp;   // This will be null at master.
-	
-	private HCatReader(final Map<String,String> config) {
-		Configuration conf = new Configuration();
-		if (null != config) {
-			for(Entry<String, String> kv : config.entrySet()){
-				conf.set(kv.getKey(), kv.getValue());
-			}			
-		}
-		this.conf = conf;	
-	}
-	
-	public Configuration getConf() {
-		if (null == conf) {
-			throw new IllegalStateException("HCatReader is not constructed correctly.");
-		}
-		return conf;
-	}
+  /**
+   * This should be called at master node to obtain {@link ReaderContext} which
+   * then should be serialized and sent to slave nodes.
+   * 
+   * @return {@link ReaderContext}
+   * @throws HCatException
+   */
+  public abstract ReaderContext prepareRead() throws HCatException;
+
+  /**
+   * This should be called at slave nodes to read {@link HCatRecord}s
+   * 
+   * @return {@link Iterator} of {@link HCatRecord}
+   * @throws HCatException
+   */
+  public abstract Iterator<HCatRecord> read() throws HCatException;
+
+  /**
+   * This constructor will be invoked by {@link DataTransferFactory} at master
+   * node. Don't use this constructor. Instead, use {@link DataTransferFactory}
+   * 
+   * @param re
+   * @param config
+   */
+  protected HCatReader(final ReadEntity re, final Map<String, String> config) {
+    this(config);
+    this.re = re;
+  }
+
+  /**
+   * This constructor will be invoked by {@link DataTransferFactory} at slave
+   * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory}
+   * 
+   * @param config
+   * @param sp
+   */
+
+  protected HCatReader(final Configuration config, StateProvider sp) {
+    this.conf = config;
+    this.sp = sp;
+  }
+
+  protected ReadEntity re; // This will be null at slaves.
+  protected Configuration conf;
+  protected ReaderContext info;
+  protected StateProvider sp; // This will be null at master.
+
+  private HCatReader(final Map<String, String> config) {
+    Configuration conf = new Configuration();
+    if (null != config) {
+      for (Entry<String, String> kv : config.entrySet()) {
+        conf.set(kv.getKey(), kv.getValue());
+      }
+    }
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    if (null == conf) {
+      throw new IllegalStateException(
+          "HCatReader is not constructed correctly.");
+    }
+    return conf;
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java Fri Jun 22 01:40:27 2012
@@ -27,69 +27,87 @@ import org.apache.hcatalog.common.HCatEx
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.transfer.state.StateProvider;
 
-/** This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external
- * systems. Don't try to instantiate this directly. Instead, use {@link DataTransferFactory}
+/**
+ * This abstraction is internal to HCatalog. This is to facilitate writing to
+ * HCatalog from external systems. Don't try to instantiate this directly.
+ * Instead, use {@link DataTransferFactory}
  */
 
 public abstract class HCatWriter {
 
-	protected Configuration conf;
-	protected WriteEntity we;     // This will be null at slave nodes.
-	protected WriterContext info; 
-	protected StateProvider sp;
-	
-	/** External system should invoke this method exactly once from a master node.
-	 * @return {@link WriterContext} This should be serialized and sent to slave nodes to 
-	 * construct HCatWriter there.
-	 * @throws HCatException
-	 */
-	public abstract WriterContext prepareWrite() throws HCatException;
-	
-	/** This method should be used at slave needs to perform writes. 
-	 * @param recordItr {@link Iterator} records to be written into HCatalog.
-	 * @throws {@link HCatException}
-	 */
-	public abstract void write(final Iterator<HCatRecord> recordItr) throws HCatException;
-	
-	/** This method should be called at master node. Primary purpose of this is to do metadata commit.
-	 * @throws {@link HCatException}
-	 */
-	public abstract void commit(final WriterContext context) throws HCatException;
-	
-	/** This method should be called at master node. Primary purpose of this is to do cleanups in case 
-	 * of failures.
-	 * @throws {@link HCatException}	 * 
-	 */
-	public abstract void abort(final WriterContext context) throws HCatException;
-	
-	/**
-	 * This constructor will be used at master node
-	 * @param we WriteEntity defines where in storage records should be written to.
-	 * @param config Any configuration which external system wants to communicate to HCatalog 
-	 * for performing writes.
-	 */
-	protected HCatWriter(final WriteEntity we, final Map<String,String> config) {
-		this(config);
-		this.we = we;
-	}
-	
-	/** This constructor will be used at slave nodes.
-	 * @param config
-	 */
-	protected HCatWriter(final Configuration config, final StateProvider sp) {
-		this.conf = config;
-		this.sp = sp;
-	}
-
-	private HCatWriter(final Map<String,String> config) {
-		Configuration conf = new Configuration();
-		if(config != null){
-			// user is providing config, so it could be null.
-			for(Entry<String, String> kv : config.entrySet()){
-				conf.set(kv.getKey(), kv.getValue());
-			}			
-		}
+  protected Configuration conf;
+  protected WriteEntity we; // This will be null at slave nodes.
+  protected WriterContext info;
+  protected StateProvider sp;
 
-		this.conf = conf;	
-	}
+  /**
+   * External system should invoke this method exactly once from a master node.
+   * 
+   * @return {@link WriterContext} This should be serialized and sent to slave
+   *         nodes to construct HCatWriter there.
+   * @throws HCatException
+   */
+  public abstract WriterContext prepareWrite() throws HCatException;
+
+  /**
+   * This method should be used at slave needs to perform writes.
+   * 
+   * @param recordItr
+   *          {@link Iterator} records to be written into HCatalog.
+   * @throws {@link HCatException}
+   */
+  public abstract void write(final Iterator<HCatRecord> recordItr)
+      throws HCatException;
+
+  /**
+   * This method should be called at master node. Primary purpose of this is to
+   * do metadata commit.
+   * 
+   * @throws {@link HCatException}
+   */
+  public abstract void commit(final WriterContext context) throws HCatException;
+
+  /**
+   * This method should be called at master node. Primary purpose of this is to
+   * do cleanups in case of failures.
+   * 
+   * @throws {@link HCatException} *
+   */
+  public abstract void abort(final WriterContext context) throws HCatException;
+
+  /**
+   * This constructor will be used at master node
+   * 
+   * @param we
+   *          WriteEntity defines where in storage records should be written to.
+   * @param config
+   *          Any configuration which external system wants to communicate to
+   *          HCatalog for performing writes.
+   */
+  protected HCatWriter(final WriteEntity we, final Map<String, String> config) {
+    this(config);
+    this.we = we;
+  }
+
+  /**
+   * This constructor will be used at slave nodes.
+   * 
+   * @param config
+   */
+  protected HCatWriter(final Configuration config, final StateProvider sp) {
+    this.conf = config;
+    this.sp = sp;
+  }
+
+  private HCatWriter(final Map<String, String> config) {
+    Configuration conf = new Configuration();
+    if (config != null) {
+      // user is providing config, so it could be null.
+      for (Entry<String, String> kv : config.entrySet()) {
+        conf.set(kv.getKey(), kv.getValue());
+      }
+    }
+
+    this.conf = conf;
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java Fri Jun 22 01:40:27 2012
@@ -20,66 +20,69 @@ package org.apache.hcatalog.data.transfe
 
 import java.util.Map;
 
-public class ReadEntity extends EntityBase.Entity{
+public class ReadEntity extends EntityBase.Entity {
 
-	private String filterString;
+  private String filterString;
 
-	/** Don't instantiate {@link ReadEntity} directly. Use, {@link ReadEntity.Builder} instead.
-	 * 
-	 */
-	private ReadEntity() {
-		// Not allowed
-	}
-	
-	private ReadEntity(Builder builder) {
-
-		this.region       = builder.region;
-		this.dbName       = builder.dbName;
-		this.tableName    = builder.tableName;
-		this.partitionKVs = builder.partitionKVs;
-		this.filterString = builder.filterString;
-	}
-
-	public String getFilterString() {
-		return this.filterString;
-	}
-
-	/** This class should be used to build {@link ReadEntity}. It follows builder pattern, letting you build
-	 * your {@link ReadEntity} with whatever level of detail you want.
-	 *
-	 */
-	public static class Builder extends EntityBase {
-
-		private String filterString;
-
-		public Builder withRegion(final String region) {
-			this.region = region;
-			return this;
-		}
-
-
-		public Builder withDatabase(final String dbName) {
-			this.dbName = dbName;
-			return this;
-		}
-
-		public Builder withTable(final String tblName) {
-			this.tableName = tblName;
-			return this;
-		}
-
-		public Builder withPartition(final Map<String,String> partKVs) {
-			this.partitionKVs = partKVs;
-			return this;
-		}
-
-		public Builder withFilter(String filterString) {
-			this.filterString = filterString;
-			return this;
-		}
-
-		public ReadEntity build() {
-			return new ReadEntity(this);
-		}
-	}
+  /**
+   * Don't instantiate {@link ReadEntity} directly. Use,
+   * {@link ReadEntity.Builder} instead.
+   * 
+   */
+  private ReadEntity() {
+    // Not allowed
+  }
+
+  private ReadEntity(Builder builder) {
+
+    this.region = builder.region;
+    this.dbName = builder.dbName;
+    this.tableName = builder.tableName;
+    this.partitionKVs = builder.partitionKVs;
+    this.filterString = builder.filterString;
+  }
+
+  public String getFilterString() {
+    return this.filterString;
+  }
+
+  /**
+   * This class should be used to build {@link ReadEntity}. It follows builder
+   * pattern, letting you build your {@link ReadEntity} with whatever level of
+   * detail you want.
+   * 
+   */
+  public static class Builder extends EntityBase {
+
+    private String filterString;
+
+    public Builder withRegion(final String region) {
+      this.region = region;
+      return this;
+    }
+
+    public Builder withDatabase(final String dbName) {
+      this.dbName = dbName;
+      return this;
+    }
+
+    public Builder withTable(final String tblName) {
+      this.tableName = tblName;
+      return this;
+    }
+
+    public Builder withPartition(final Map<String, String> partKVs) {
+      this.partitionKVs = partKVs;
+      return this;
+    }
+
+    public Builder withFilter(String filterString) {
+      this.filterString = filterString;
+      return this;
+    }
+
+    public ReadEntity build() {
+      return new ReadEntity(this);
+    }
+  }
 }
\ No newline at end of file

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java Fri Jun 22 01:40:27 2012
@@ -30,57 +30,59 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hcatalog.mapreduce.HCatSplit;
 
-/** This class will contain information of different {@link InputSplit} obtained at master node
- * and configuration. This class implements {@link Externalizable} so it can be serialized using
- * standard java mechanisms.
+/**
+ * This class will contain information of different {@link InputSplit} obtained
+ * at master node and configuration. This class implements
+ * {@link Externalizable} so it can be serialized using standard java
+ * mechanisms.
  */
 public class ReaderContext implements Externalizable, Configurable {
 
-	private static final long serialVersionUID = -2656468331739574367L;
-	private List<InputSplit> splits;
-	private Configuration conf;
-
-	public ReaderContext() {
-		this.splits = new ArrayList<InputSplit>();
-		this.conf = new Configuration();
-	}
-	
-	public void setInputSplits(final List<InputSplit> splits) {
-		this.splits = splits;
-	}
-	
-	public List<InputSplit> getSplits() {
-		return splits;
-	}
-	
-	@Override
-	public Configuration getConf() {
-		return conf;
-	}
-
-	@Override
-	public void setConf(final Configuration config) {
-		conf = config;
-	}
-
-	@Override
-	public void writeExternal(ObjectOutput out) throws IOException {
-		conf.write(out);
-		out.writeInt(splits.size());
-		for (InputSplit split : splits) {
-			((HCatSplit)split).write(out);
-		}
-	}
-
-	@Override
-	public void readExternal(ObjectInput in) throws IOException,
-			ClassNotFoundException {
-		conf.readFields(in);
-		int numOfSplits = in.readInt();
-		for (int i=0 ; i < numOfSplits; i++) {
-			HCatSplit split = new HCatSplit();
-			split.readFields(in);
-			splits.add(split);
-		}
-	}
+  private static final long serialVersionUID = -2656468331739574367L;
+  private List<InputSplit> splits;
+  private Configuration conf;
+
+  public ReaderContext() {
+    this.splits = new ArrayList<InputSplit>();
+    this.conf = new Configuration();
+  }
+
+  public void setInputSplits(final List<InputSplit> splits) {
+    this.splits = splits;
+  }
+
+  public List<InputSplit> getSplits() {
+    return splits;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(final Configuration config) {
+    conf = config;
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    conf.write(out);
+    out.writeInt(splits.size());
+    for (InputSplit split : splits) {
+      ((HCatSplit) split).write(out);
+    }
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException,
+      ClassNotFoundException {
+    conf.readFields(in);
+    int numOfSplits = in.readInt();
+    for (int i = 0; i < numOfSplits; i++) {
+      HCatSplit split = new HCatSplit();
+      split.readFields(in);
+      splits.add(split);
+    }
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java Fri Jun 22 01:40:27 2012
@@ -20,52 +20,55 @@ package org.apache.hcatalog.data.transfe
 
 import java.util.Map;
 
-public class WriteEntity extends EntityBase.Entity{
+public class WriteEntity extends EntityBase.Entity {
 
-	/** Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to build 
-	 * {@link WriteEntity}.
-	 */
-
-	private WriteEntity() {
-		// Not allowed.
-	}
-	
-	private WriteEntity(Builder builder) {
-		this.region = builder.region;
-		this.dbName = builder.dbName;
-		this.tableName = builder.tableName;
-		this.partitionKVs = builder.partitionKVs;
-	}
-	
-	/** This class should be used to build {@link WriteEntity}. It follows builder pattern, letting you build
-	 * your {@link WriteEntity} with whatever level of detail you want.
-	 *
-	 */
-	public static class Builder extends EntityBase{
-		
-		public Builder withRegion(final String region) {
-			this.region = region;
-			return this;
-		}
-		
-		public Builder withDatabase(final String dbName) {
-			this.dbName = dbName;
-			return this;
-		}
-		
-		public Builder withTable(final String tblName) {
-			this.tableName = tblName;
-			return this;
-		}
-		
-		public Builder withPartition(final Map<String,String> partKVs) {
-			this.partitionKVs = partKVs;
-			return this;
-		}
-		
-		public WriteEntity build() {
-			return new WriteEntity(this);
-		}
-		
-	}
+  /**
+   * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to
+   * build {@link WriteEntity}.
+   */
+
+  private WriteEntity() {
+    // Not allowed.
+  }
+
+  private WriteEntity(Builder builder) {
+    this.region = builder.region;
+    this.dbName = builder.dbName;
+    this.tableName = builder.tableName;
+    this.partitionKVs = builder.partitionKVs;
+  }
+
+  /**
+   * This class should be used to build {@link WriteEntity}. It follows builder
+   * pattern, letting you build your {@link WriteEntity} with whatever level of
+   * detail you want.
+   * 
+   */
+  public static class Builder extends EntityBase {
+
+    public Builder withRegion(final String region) {
+      this.region = region;
+      return this;
+    }
+
+    public Builder withDatabase(final String dbName) {
+      this.dbName = dbName;
+      return this;
+    }
+
+    public Builder withTable(final String tblName) {
+      this.tableName = tblName;
+      return this;
+    }
+
+    public Builder withPartition(final Map<String, String> partKVs) {
+      this.partitionKVs = partKVs;
+      return this;
+    }
+
+    public WriteEntity build() {
+      return new WriteEntity(this);
+    }
+
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java Fri Jun 22 01:40:27 2012
@@ -26,38 +26,39 @@ import java.io.ObjectOutput;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 
-/** This contains information obtained at master node to help prepare slave nodes for writer. 
- * This class implements {@link Externalizable} so it can be serialized using
- * standard java mechanisms. Master should serialize it and make it available to slaves to
- * prepare for writes.
+/**
+ * This contains information obtained at master node to help prepare slave nodes
+ * for writer. This class implements {@link Externalizable} so it can be
+ * serialized using standard java mechanisms. Master should serialize it and
+ * make it available to slaves to prepare for writes.
  */
-public class WriterContext implements Externalizable, Configurable{
+public class WriterContext implements Externalizable, Configurable {
 
-	private static final long serialVersionUID = -5899374262971611840L;
-	private Configuration conf;
+  private static final long serialVersionUID = -5899374262971611840L;
+  private Configuration conf;
 
-	public WriterContext() {
-		conf = new Configuration();
-	}
-	
-	@Override
-	public Configuration getConf() {
-		return conf;
-	}
-	
-	@Override
-	public void setConf(final Configuration config) {
-		this.conf = config;
-	}
-
-	@Override
-	public void writeExternal(ObjectOutput out) throws IOException {
-		conf.write(out);
-	}
-
-	@Override
-	public void readExternal(ObjectInput in) throws IOException,
-			ClassNotFoundException {
-		conf.readFields(in);
-	}
+  public WriterContext() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(final Configuration config) {
+    this.conf = config;
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    conf.write(out);
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException,
+      ClassNotFoundException {
+    conf.readFields(in);
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Jun 22 01:40:27 2012
@@ -40,98 +40,102 @@ import org.apache.hcatalog.data.transfer
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 
-/** This reader reads via {@link HCatInputFormat}
+/**
+ * This reader reads via {@link HCatInputFormat}
  * 
  */
-public class HCatInputFormatReader extends HCatReader{
+public class HCatInputFormatReader extends HCatReader {
 
-	private InputSplit split;
-	
-	public HCatInputFormatReader(InputSplit split, Configuration config, StateProvider sp) {
-		super(config, sp);
-		this.split = split;
-	}
-
-	public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
-		super(info,config);
-	}
-
-	@Override
-	public ReaderContext prepareRead() throws HCatException {
-
-		try {
-			Job job = new Job(conf);
-			InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString());
-			HCatInputFormat.setInput(job, jobInfo);
-			HCatInputFormat hcif = new HCatInputFormat();
-			ReaderContext cntxt = new ReaderContext();
-			cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null)));
-			cntxt.setConf(job.getConfiguration());
-			return cntxt;
-		} catch (IOException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		} catch (InterruptedException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED,e);
-		}
-	}
-
-	@Override
-	public Iterator<HCatRecord> read() throws HCatException {
-
-		HCatInputFormat inpFmt = new HCatInputFormat();
-		RecordReader<WritableComparable, HCatRecord> rr;
-		try {
-			TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID());
-			rr = inpFmt.createRecordReader(split, cntxt);
-			rr.initialize(split, cntxt);
-		} catch (IOException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		} catch (InterruptedException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		}
-		return new HCatRecordItr(rr);
-	}
-
-
-	private static class HCatRecordItr implements Iterator<HCatRecord>{
-
-		private RecordReader<WritableComparable, HCatRecord> curRecReader;
-
-		HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
-			curRecReader = rr;
-		}
-
-		@Override
-		public boolean hasNext(){
-			try {
-				boolean retVal = curRecReader.nextKeyValue();
-				if (retVal) {
-					return true;
-				}
-				// if its false, we need to close recordReader.
-				curRecReader.close();
-				return false;
-			} catch (IOException e) {
-				throw new RuntimeException(e);
-			} catch (InterruptedException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public HCatRecord next() {
-			try {
-				return curRecReader.getCurrentValue();
-			} catch (IOException e) {
-				throw new RuntimeException(e);
-			} catch (InterruptedException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Not allowed");
-		}
-	}
+  private InputSplit split;
+
+  public HCatInputFormatReader(InputSplit split, Configuration config,
+      StateProvider sp) {
+    super(config, sp);
+    this.split = split;
+  }
+
+  public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+    super(info, config);
+  }
+
+  @Override
+  public ReaderContext prepareRead() throws HCatException {
+
+    try {
+      Job job = new Job(conf);
+      InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(),
+          re.getTableName(), re.getFilterString());
+      HCatInputFormat.setInput(job, jobInfo);
+      HCatInputFormat hcif = new HCatInputFormat();
+      ReaderContext cntxt = new ReaderContext();
+      cntxt.setInputSplits(hcif.getSplits(new JobContext(
+          job.getConfiguration(), null)));
+      cntxt.setConf(job.getConfiguration());
+      return cntxt;
+    } catch (IOException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    } catch (InterruptedException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    }
+  }
+
+  @Override
+  public Iterator<HCatRecord> read() throws HCatException {
+
+    HCatInputFormat inpFmt = new HCatInputFormat();
+    RecordReader<WritableComparable, HCatRecord> rr;
+    try {
+      TaskAttemptContext cntxt = new TaskAttemptContext(conf,
+          new TaskAttemptID());
+      rr = inpFmt.createRecordReader(split, cntxt);
+      rr.initialize(split, cntxt);
+    } catch (IOException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    } catch (InterruptedException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    }
+    return new HCatRecordItr(rr);
+  }
+
+  private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+    private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+    HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+      curRecReader = rr;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        boolean retVal = curRecReader.nextKeyValue();
+        if (retVal) {
+          return true;
+        }
+        // if its false, we need to close recordReader.
+        curRecReader.close();
+        return false;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public HCatRecord next() {
+      try {
+        return curRecReader.getCurrentValue();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Not allowed");
+    }
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Jun 22 01:40:27 2012
@@ -42,112 +42,121 @@ import org.apache.hcatalog.data.transfer
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
-/** This writer writes via {@link HCatOutputFormat}
+/**
+ * This writer writes via {@link HCatOutputFormat}
  * 
  */
 public class HCatOutputFormatWriter extends HCatWriter {
 
-	public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
-		super(we, config);
-	}
-
-	public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
-		super(config, sp);
-	}
-
-	@Override
-	public WriterContext prepareWrite() throws HCatException {
-		OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs());
-		Job job;
-		try {
-			job = new Job(conf);
-			HCatOutputFormat.setOutput(job, jobInfo);
-			HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
-			HCatOutputFormat outFormat = new HCatOutputFormat();
-			outFormat.checkOutputSpecs(job);
-			outFormat.getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).setupJob(job);
-		} catch (IOException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		} catch (InterruptedException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		}
-		WriterContext cntxt = new WriterContext(); 
-		cntxt.setConf(job.getConfiguration());
-		return cntxt;
-	}
-
-	@Override
-	public void write(Iterator<HCatRecord> recordItr) throws HCatException {
-		
-		int id = sp.getId();
-		setVarsInConf(id);
-		HCatOutputFormat outFormat = new HCatOutputFormat();
-		TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(new TaskID(), id));
-		OutputCommitter committer = null;
-		RecordWriter<WritableComparable<?>, HCatRecord> writer;
-		try {
-			committer = outFormat.getOutputCommitter(cntxt);
-			committer.setupTask(cntxt);
-			writer   = outFormat.getRecordWriter(cntxt);
-			while(recordItr.hasNext()){
-				HCatRecord rec = recordItr.next();
-				writer.write(null, rec);
-			}
-			writer.close(cntxt);
-			if(committer.needsTaskCommit(cntxt)){
-				committer.commitTask(cntxt);				
-			}
-		} catch (IOException e) {
-			if(null != committer) {
-				try {
-					committer.abortTask(cntxt);
-				} catch (IOException e1) {
-					throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
-				}
-			}
-			throw new HCatException("Failed while writing",e);
-		} catch (InterruptedException e) {
-			if(null != committer) {
-				try {
-					committer.abortTask(cntxt);
-				} catch (IOException e1) {
-					throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
-				}
-			}
-			throw new HCatException("Failed while writing", e);
-		}
-	}
-
-	@Override
-	public void commit(WriterContext context) throws HCatException {
-		try {
-			new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
-			.commitJob(new JobContext(context.getConf(), null));
-		} catch (IOException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		} catch (InterruptedException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		}
-	}
-
-	@Override
-	public void abort(WriterContext context) throws HCatException {
-		try {
-			new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
-			.abortJob(new JobContext(context.getConf(), null),State.FAILED);
-		} catch (IOException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		} catch (InterruptedException e) {
-			throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
-		}
-	}
-	
-	private void setVarsInConf(int id) {
-		
-		// Following two config keys are required by FileOutputFormat to work correctly.
-		// In usual case of Hadoop, JobTracker will set these before launching tasks.
-		// Since there is no jobtracker here, we set it ourself.
-		conf.setInt("mapred.task.partition", id);
-		conf.set("mapred.task.id", "attempt__0000_r_000000_"+id); 
-	}
+  public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+    super(we, config);
+  }
+
+  public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+    super(config, sp);
+  }
+
+  @Override
+  public WriterContext prepareWrite() throws HCatException {
+    OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
+        we.getTableName(), we.getPartitionKVs());
+    Job job;
+    try {
+      job = new Job(conf);
+      HCatOutputFormat.setOutput(job, jobInfo);
+      HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+      HCatOutputFormat outFormat = new HCatOutputFormat();
+      outFormat.checkOutputSpecs(job);
+      outFormat.getOutputCommitter(
+          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()))
+          .setupJob(job);
+    } catch (IOException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    } catch (InterruptedException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    }
+    WriterContext cntxt = new WriterContext();
+    cntxt.setConf(job.getConfiguration());
+    return cntxt;
+  }
+
+  @Override
+  public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+
+    int id = sp.getId();
+    setVarsInConf(id);
+    HCatOutputFormat outFormat = new HCatOutputFormat();
+    TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(
+        new TaskID(), id));
+    OutputCommitter committer = null;
+    RecordWriter<WritableComparable<?>, HCatRecord> writer;
+    try {
+      committer = outFormat.getOutputCommitter(cntxt);
+      committer.setupTask(cntxt);
+      writer = outFormat.getRecordWriter(cntxt);
+      while (recordItr.hasNext()) {
+        HCatRecord rec = recordItr.next();
+        writer.write(null, rec);
+      }
+      writer.close(cntxt);
+      if (committer.needsTaskCommit(cntxt)) {
+        committer.commitTask(cntxt);
+      }
+    } catch (IOException e) {
+      if (null != committer) {
+        try {
+          committer.abortTask(cntxt);
+        } catch (IOException e1) {
+          throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+        }
+      }
+      throw new HCatException("Failed while writing", e);
+    } catch (InterruptedException e) {
+      if (null != committer) {
+        try {
+          committer.abortTask(cntxt);
+        } catch (IOException e1) {
+          throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+        }
+      }
+      throw new HCatException("Failed while writing", e);
+    }
+  }
+
+  @Override
+  public void commit(WriterContext context) throws HCatException {
+    try {
+      new HCatOutputFormat().getOutputCommitter(
+          new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+          .commitJob(new JobContext(context.getConf(), null));
+    } catch (IOException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    } catch (InterruptedException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    }
+  }
+
+  @Override
+  public void abort(WriterContext context) throws HCatException {
+    try {
+      new HCatOutputFormat().getOutputCommitter(
+          new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+          .abortJob(new JobContext(context.getConf(), null), State.FAILED);
+    } catch (IOException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    } catch (InterruptedException e) {
+      throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+    }
+  }
+
+  private void setVarsInConf(int id) {
+
+    // Following two config keys are required by FileOutputFormat to work
+    // correctly.
+    // In usual case of Hadoop, JobTracker will set these before launching
+    // tasks.
+    // Since there is no jobtracker here, we set it ourself.
+    conf.setInt("mapred.task.partition", id);
+    conf.set("mapred.task.id", "attempt__0000_r_000000_" + id);
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Fri Jun 22 01:40:27 2012
@@ -21,26 +21,27 @@ package org.apache.hcatalog.data.transfe
 import java.text.NumberFormat;
 import java.util.Random;
 
-
 public class DefaultStateProvider implements StateProvider {
 
-	/** Default implementation. Here, ids are generated randomly.
-	 */
-	@Override
-	public int getId() {
-		
-		NumberFormat numberFormat = NumberFormat.getInstance();
-	    numberFormat.setMinimumIntegerDigits(5);
-	    numberFormat.setGroupingUsed(false);
-		return Integer.parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
-	}
+  /**
+   * Default implementation. Here, ids are generated randomly.
+   */
+  @Override
+  public int getId() {
+
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+    return Integer
+        .parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+  }
+
+  private static StateProvider sp;
 
-	private static StateProvider sp;
-	
-	public static synchronized StateProvider get() {
-		if (null == sp) {
-			sp = new DefaultStateProvider();
-		}
-		return sp;
-	}
+  public static synchronized StateProvider get() {
+    if (null == sp) {
+      sp = new DefaultStateProvider();
+    }
+    return sp;
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Fri Jun 22 01:40:27 2012
@@ -21,14 +21,17 @@ package org.apache.hcatalog.data.transfe
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.TaskTracker;
 
-/** If external system wants to communicate any state to slaves, they can do so via this interface.
- * One example of this in case of Map-Reduce is ids assigned by {@link JobTracker} to 
- * {@link TaskTracker}
+/**
+ * If external system wants to communicate any state to slaves, they can do so
+ * via this interface. One example of this in case of Map-Reduce is ids assigned
+ * by {@link JobTracker} to {@link TaskTracker}
  */
 public interface StateProvider {
 
-	/** This method should return id assigned to slave node.
-	 * @return id
-	 */
-	public int getId();
+  /**
+   * This method should return id assigned to slave node.
+   * 
+   * @return id
+   */
+  public int getId();
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Fri Jun 22 01:40:27 2012
@@ -64,288 +64,310 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hcatalog.common.HCatConstants;
 
 /**
- * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener}
- * It sends message on two type of topics. One has name of form dbName.tblName
- * On this topic, two kind of messages are sent: add/drop partition and 
- * finalize_partition message.
- * Second topic has name "HCAT" and messages sent on it are: add/drop database
- * and add/drop table.
- * All messages also has a property named "HCAT_EVENT" set on them whose value
- * can be used to configure message selector on subscriber side.  
+ * Implementation of
+ * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
+ * message on two type of topics. One has name of form dbName.tblName On this
+ * topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message. Second topic has name "HCAT" and messages sent on
+ * it are: add/drop database and add/drop table. All messages also has a
+ * property named "HCAT_EVENT" set on them whose value can be used to configure
+ * message selector on subscriber side.
  */
-public class NotificationListener extends MetaStoreEventListener{
+public class NotificationListener extends MetaStoreEventListener {
 
-	private static final Log LOG = LogFactory.getLog(NotificationListener.class);
-	protected Session session;
-	protected Connection conn;
-
-	/**
-	 * Create message bus connection and session in constructor.
-	 */
-	public NotificationListener(final Configuration conf) {
-
-		super(conf);
-		createConnection();
-	}
-
-	private static String getTopicName(Partition partition,
-			ListenerEvent partitionEvent) throws MetaException {
-		try {
-			return partitionEvent.getHandler()
-					.get_table(partition.getDbName(), partition.getTableName())
-					.getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
-		} catch (NoSuchObjectException e) {
-			throw new MetaException(e.toString());
-		}
-	}
-	
-	@Override
-	public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
-		// Subscriber can get notification of newly add partition in a 
-		// particular table by listening on a topic named "dbName.tableName" 
-		// and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" 
-		if(partitionEvent.getStatus()){
+  private static final Log LOG = LogFactory.getLog(NotificationListener.class);
+  protected Session session;
+  protected Connection conn;
+
+  /**
+   * Create message bus connection and session in constructor.
+   */
+  public NotificationListener(final Configuration conf) {
+
+    super(conf);
+    createConnection();
+  }
+
+  private static String getTopicName(Partition partition,
+      ListenerEvent partitionEvent) throws MetaException {
+    try {
+      return partitionEvent.getHandler()
+          .get_table(partition.getDbName(), partition.getTableName())
+          .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+    } catch (NoSuchObjectException e) {
+      throw new MetaException(e.toString());
+    }
+  }
+
+  @Override
+  public void onAddPartition(AddPartitionEvent partitionEvent)
+      throws MetaException {
+    // Subscriber can get notification of newly add partition in a
+    // particular table by listening on a topic named "dbName.tableName"
+    // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+    if (partitionEvent.getStatus()) {
 
-			Partition partition = partitionEvent.getPartition();
-			String topicName = getTopicName(partition, partitionEvent);
+      Partition partition = partitionEvent.getPartition();
+      String topicName = getTopicName(partition, partitionEvent);
       if (topicName != null && !topicName.equals("")) {
-			  send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
-      }
-      else {
-        LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName()
-            + "." + partition.getTableName()
+        send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
+      } else {
+        LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+            + partition.getDbName()
+            + "."
+            + partition.getTableName()
             + " To enable notifications for this table, please do alter table set properties ("
             + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
             + "=<dbname>.<tablename>) or whatever you want topic name to be.");
       }
-		}
+    }
 
-	}
+  }
 
-	@Override
-	public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
-		// Subscriber can get notification of dropped partition in a 
-		// particular table by listening on a topic named "dbName.tableName" 
-		// and message  selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" 
-
-		// Datanucleus throws NPE when we try to serialize a partition object
-		// retrieved from metastore. To workaround that we reset following objects
-
-		if(partitionEvent.getStatus()){
-			Partition partition = partitionEvent.getPartition();
-			StorageDescriptor sd = partition.getSd();
-			sd.setBucketCols(new ArrayList<String>());
-			sd.setSortCols(new ArrayList<Order>());
-			sd.setParameters(new HashMap<String, String>());
-			sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-			String topicName = getTopicName(partition, partitionEvent);
+  @Override
+  public void onDropPartition(DropPartitionEvent partitionEvent)
+      throws MetaException {
+    // Subscriber can get notification of dropped partition in a
+    // particular table by listening on a topic named "dbName.tableName"
+    // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
+
+    // Datanucleus throws NPE when we try to serialize a partition object
+    // retrieved from metastore. To workaround that we reset following objects
+
+    if (partitionEvent.getStatus()) {
+      Partition partition = partitionEvent.getPartition();
+      StorageDescriptor sd = partition.getSd();
+      sd.setBucketCols(new ArrayList<String>());
+      sd.setSortCols(new ArrayList<Order>());
+      sd.setParameters(new HashMap<String, String>());
+      sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+      String topicName = getTopicName(partition, partitionEvent);
       if (topicName != null && !topicName.equals("")) {
-			  send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
-      }
-      else {
-        LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName()
-            + "." + partition.getTableName()
+        send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
+      } else {
+        LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+            + partition.getDbName()
+            + "."
+            + partition.getTableName()
             + " To enable notifications for this table, please do alter table set properties ("
             + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
             + "=<dbname>.<tablename>) or whatever you want topic name to be.");
       }
-		}
-	}
+    }
+  }
+
+  @Override
+  public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+      throws MetaException {
+    // Subscriber can get notification about addition of a database in HCAT
+    // by listening on a topic named "HCAT" and message selector string
+    // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+    if (dbEvent.getStatus())
+      send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+          .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
+  }
+
+  @Override
+  public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+    // Subscriber can get notification about drop of a database in HCAT
+    // by listening on a topic named "HCAT" and message selector string
+    // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+    if (dbEvent.getStatus())
+      send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+          .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
+  }
+
+  @Override
+  public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+    // Subscriber can get notification about addition of a table in HCAT
+    // by listening on a topic named "HCAT" and message selector string
+    // as "HCAT_EVENT = HCAT_ADD_TABLE"
+    if (tableEvent.getStatus()) {
+      Table tbl = tableEvent.getTable();
+      HMSHandler handler = tableEvent.getHandler();
+      HiveConf conf = handler.getHiveConf();
+      Table newTbl;
+      try {
+        newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+            .deepCopy();
+        newTbl.getParameters().put(
+            HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+            getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+                + newTbl.getTableName().toLowerCase());
+        handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+      } catch (InvalidOperationException e) {
+        MetaException me = new MetaException(e.toString());
+        me.initCause(e);
+        throw me;
+      } catch (NoSuchObjectException e) {
+        MetaException me = new MetaException(e.toString());
+        me.initCause(e);
+        throw me;
+      }
+      send(newTbl, getTopicPrefix(conf) + "."
+          + newTbl.getDbName().toLowerCase(),
+          HCatConstants.HCAT_ADD_TABLE_EVENT);
+    }
+  }
+
+  private String getTopicPrefix(HiveConf conf) {
+    return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
+        HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+  }
+
+  @Override
+  public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+    // Subscriber can get notification about drop of a table in HCAT
+    // by listening on a topic named "HCAT" and message selector string
+    // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+    // Datanucleus throws NPE when we try to serialize a table object
+    // retrieved from metastore. To workaround that we reset following objects
+
+    if (tableEvent.getStatus()) {
+      Table table = tableEvent.getTable();
+      StorageDescriptor sd = table.getSd();
+      sd.setBucketCols(new ArrayList<String>());
+      sd.setSortCols(new ArrayList<Order>());
+      sd.setParameters(new HashMap<String, String>());
+      sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+      send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
+          + table.getDbName().toLowerCase(),
+          HCatConstants.HCAT_DROP_TABLE_EVENT);
+    }
+  }
+
+  /**
+   * @param msgBody
+   *          is the metastore object. It is sent in full such that if
+   *          subscriber is really interested in details, it can reconstruct it
+   *          fully. In case of finalize_partition message this will be string
+   *          specification of the partition.
+   * @param topicName
+   *          is the name on message broker on which message is sent.
+   * @param event
+   *          is the value of HCAT_EVENT property in message. It can be used to
+   *          select messages in client side.
+   */
+  protected void send(Object msgBody, String topicName, String event) {
+
+    try {
+
+      Destination topic = null;
+      if (null == session) {
+        // this will happen, if we never able to establish a connection.
+        createConnection();
+        if (null == session) {
+          // Still not successful, return from here.
+          LOG.error("Invalid session. Failed to send message on topic: "
+              + topicName + " event: " + event);
+          return;
+        }
+      }
+      try {
+        // Topics are created on demand. If it doesn't exist on broker it will
+        // be created when broker receives this message.
+        topic = session.createTopic(topicName);
+      } catch (IllegalStateException ise) {
+        // this will happen if we were able to establish connection once, but
+        // its no longer valid,
+        // ise is thrown, catch it and retry.
+        LOG.error("Seems like connection is lost. Retrying", ise);
+        createConnection();
+        topic = session.createTopic(topicName);
+      }
+      if (null == topic) {
+        // Still not successful, return from here.
+        LOG.error("Invalid session. Failed to send message on topic: "
+            + topicName + " event: " + event);
+        return;
+      }
+      MessageProducer producer = session.createProducer(topic);
+      Message msg;
+      if (msgBody instanceof Map) {
+        MapMessage mapMsg = session.createMapMessage();
+        Map<String, String> incomingMap = (Map<String, String>) msgBody;
+        for (Entry<String, String> partCol : incomingMap.entrySet()) {
+          mapMsg.setString(partCol.getKey(), partCol.getValue());
+        }
+        msg = mapMsg;
+      } else {
+        msg = session.createObjectMessage((Serializable) msgBody);
+      }
+
+      msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+      producer.send(msg);
+      // Message must be transacted before we return.
+      session.commit();
+    } catch (Exception e) {
+      // Gobble up the exception. Message delivery is best effort.
+      LOG.error("Failed to send message on topic: " + topicName + " event: "
+          + event, e);
+    }
+  }
+
+  protected void createConnection() {
+
+    Context jndiCntxt;
+    try {
+      jndiCntxt = new InitialContext();
+      ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
+          .lookup("ConnectionFactory");
+      Connection conn = connFac.createConnection();
+      conn.start();
+      conn.setExceptionListener(new ExceptionListener() {
+        @Override
+        public void onException(JMSException jmse) {
+          LOG.error(jmse);
+        }
+      });
+      // We want message to be sent when session commits, thus we run in
+      // transacted mode.
+      session = conn.createSession(true, Session.SESSION_TRANSACTED);
+    } catch (NamingException e) {
+      LOG.error("JNDI error while setting up Message Bus connection. "
+          + "Please make sure file named 'jndi.properties' is in "
+          + "classpath and contains appropriate key-value pairs.", e);
+    } catch (JMSException e) {
+      LOG.error("Failed to initialize connection to message bus", e);
+    } catch (Throwable t) {
+      LOG.error("Unable to connect to JMS provider", t);
+    }
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    // Close the connection before dying.
+    try {
+      if (null != session)
+        session.close();
+      if (conn != null) {
+        conn.close();
+      }
 
-	@Override
-	public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
-		// Subscriber can get notification about addition of a database in HCAT
-		// by listening on a topic named "HCAT" and message selector string
-		// as "HCAT_EVENT = HCAT_ADD_DATABASE" 
-		if(dbEvent.getStatus())
-			send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT);
-	}
-
-	@Override
-	public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
-		// Subscriber can get notification about drop of a database in HCAT
-		// by listening on a topic named "HCAT" and message selector string
-		// as "HCAT_EVENT = HCAT_DROP_DATABASE" 
-		if(dbEvent.getStatus())
-			send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT);
-	}
-
-	@Override
-	public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
-		// Subscriber can get notification about addition of  a table in HCAT
-		// by listening on a topic named "HCAT" and message selector string
-		// as "HCAT_EVENT = HCAT_ADD_TABLE" 
-		if(tableEvent.getStatus()){
-			Table tbl = tableEvent.getTable();
-			HMSHandler handler = tableEvent.getHandler();
-			HiveConf conf = handler.getHiveConf();
-			Table newTbl;
-			try {
-				newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()).deepCopy();
-				newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, 
-						getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase()
-						+"." + newTbl.getTableName().toLowerCase());
-				handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
-			} catch (InvalidOperationException e) {
-				 MetaException me  = new MetaException(e.toString());
-				 me.initCause(e);
-				throw me;
-			} catch (NoSuchObjectException e) {
-				 MetaException me  = new MetaException(e.toString());
-				 me.initCause(e);
-				throw me;
-			}
-			send(newTbl,getTopicPrefix(conf)+ "."+ newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT);
-		}
-	}
-	
-	private String getTopicPrefix(HiveConf conf){
-		return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
-	}
-	
-	@Override
-	public void onDropTable(DropTableEvent tableEvent) throws MetaException {
-		// Subscriber can get notification about drop of a  table in HCAT
-		// by listening on a topic named "HCAT" and message selector string
-		// as "HCAT_EVENT = HCAT_DROP_TABLE" 
-
-		// Datanucleus throws NPE when we try to serialize a table object
-		// retrieved from metastore. To workaround that we reset following objects
-
-		if(tableEvent.getStatus()){
-			Table table = tableEvent.getTable();
-			StorageDescriptor sd = table.getSd();
-			sd.setBucketCols(new ArrayList<String>());
-			sd.setSortCols(new ArrayList<Order>());
-			sd.setParameters(new HashMap<String, String>());
-			sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-			send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName().toLowerCase(), HCatConstants.HCAT_DROP_TABLE_EVENT);	
-		}
-	}
-
-	/**
-	 * @param msgBody is the metastore object. It is sent in full such that
-	 * if subscriber is really interested in details, it can reconstruct it fully.
-	 * In case of finalize_partition message this will be string specification of 
-	 * the partition.
-	 * @param topicName is the name on message broker on which message is sent.
-	 * @param event is the value of HCAT_EVENT property in message. It can be 
-	 * used to select messages in client side. 
-	 */
-	protected void send(Object msgBody, String topicName, String event){
-
-		try{
-
-			Destination topic = null;
-			if(null == session){
-				// this will happen, if we never able to establish a connection.
-				createConnection();
-				if (null == session){
-					// Still not successful, return from here.
-					LOG.error("Invalid session. Failed to send message on topic: "+
-							topicName + " event: "+event);				
-					return;
-				}
-			}
-			try{
-				// Topics are created on demand. If it doesn't exist on broker it will
-				// be created when broker receives this message.
-				topic = session.createTopic(topicName);				
-			} catch (IllegalStateException ise){
-				// this will happen if we were able to establish connection once, but its no longer valid,
-				// ise is thrown, catch it and retry.
-				LOG.error("Seems like connection is lost. Retrying", ise);
-				createConnection();
-				topic = session.createTopic(topicName);				
-			}
-			if (null == topic){
-				// Still not successful, return from here.
-				LOG.error("Invalid session. Failed to send message on topic: "+
-						topicName + " event: "+event);				
-				return;
-			}
-			MessageProducer producer = session.createProducer(topic);
-			Message msg;
-			if (msgBody instanceof Map){
-				MapMessage mapMsg = session.createMapMessage();
-				Map<String,String> incomingMap = (Map<String,String>)msgBody;
-				for (Entry<String,String> partCol : incomingMap.entrySet()){
-					mapMsg.setString(partCol.getKey(), partCol.getValue());
-				}
-				msg = mapMsg;
-			}
-			else {
-				msg = session.createObjectMessage((Serializable)msgBody);
-			}
-
-			msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
-			producer.send(msg);
-			// Message must be transacted before we return.
-			session.commit();
-		} catch(Exception e){
-			// Gobble up the exception. Message delivery is best effort.
-			LOG.error("Failed to send message on topic: "+topicName + 
-					" event: "+event , e);
-		}
-	}
-
-	protected void createConnection(){
-
-		Context jndiCntxt;
-		try {
-			jndiCntxt = new InitialContext();
-			ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
-			Connection conn = connFac.createConnection();
-			conn.start();
-			conn.setExceptionListener(new ExceptionListener() {
-				@Override
-				public void onException(JMSException jmse) {
-						LOG.error(jmse);
-				}
-			});
-			// We want message to be sent when session commits, thus we run in
-			// transacted mode.
-			session = conn.createSession(true, Session.SESSION_TRANSACTED);
-		} catch (NamingException e) {
-			LOG.error("JNDI error while setting up Message Bus connection. " +
-					"Please make sure file named 'jndi.properties' is in " +
-					"classpath and contains appropriate key-value pairs.",e);
-		} catch (JMSException e) {
-			LOG.error("Failed to initialize connection to message bus",e);
-		} catch(Throwable t){
-			LOG.error("Unable to connect to JMS provider",t);
-		}
-	}
-
-	@Override
-	protected void finalize() throws Throwable {
-		// Close the connection before dying.
-		try {
-			if (null != session)
-				session.close();
-			if(conn != null) {
-				conn.close();
-			}
-			
-		} catch (Exception ignore) {
-			LOG.info("Failed to close message bus connection.", ignore);
-		}
-	}
-
-	@Override
-	public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
-			throws MetaException {
-		if(lpde.getStatus())
-			send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
-	}
-	
-	@Override
-	public void onAlterPartition(AlterPartitionEvent ape) throws MetaException{
-		//no-op
-	}
-	
-	@Override
-	public void onAlterTable(AlterTableEvent ate) throws MetaException {
-		// no-op
-	}
+    } catch (Exception ignore) {
+      LOG.info("Failed to close message bus connection.", ignore);
+    }
+  }
+
+  @Override
+  public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+      throws MetaException {
+    if (lpde.getStatus())
+      send(
+          lpde.getPartitionName(),
+          lpde.getTable().getParameters()
+              .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
+          HCatConstants.HCAT_PARTITION_DONE_EVENT);
+  }
+
+  @Override
+  public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+    // no-op
+  }
+
+  @Override
+  public void onAlterTable(AlterTableEvent ate) throws MetaException {
+    // no-op
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java Fri Jun 22 01:40:27 2012
@@ -166,7 +166,7 @@ public abstract class HCatBaseStorer ext
       return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
 
     case DataType.BYTEARRAY:
-    	return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
+      return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
 
     case DataType.BAG:
       Schema bagSchema = fSchema.schema;

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Jun 22 01:40:27 2012
@@ -156,7 +156,7 @@ public class HCatStorer extends HCatBase
       //Calling it from here so that the partition publish happens.
       //This call needs to be removed after MAPREDUCE-1447 is fixed.
         getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
-        		job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
+            job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
       } catch (IOException e) {
         throw new IOException("Failed to cleanup job",e);
       } catch (InterruptedException e) {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java Fri Jun 22 01:40:27 2012
@@ -91,7 +91,7 @@ public class StorageDelegationAuthorizat
         
         //else we do not have anything to delegate to
         throw new HiveException(String.format("Storage Handler for table:%s is not an instance " +
-        		"of HCatStorageHandler", table.getTableName()));
+            "of HCatStorageHandler", table.getTableName()));
       }
     } else {
       //return an authorizer for HDFS

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Fri Jun 22 01:40:27 2012
@@ -25,38 +25,39 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 /**
- * Shim layer to abstract differences between Hadoop 0.20 and 0.23 (HCATALOG-179).
- * This mirrors Hive shims, but is kept separate for HCatalog dependencies.
+ * Shim layer to abstract differences between Hadoop 0.20 and 0.23
+ * (HCATALOG-179). This mirrors Hive shims, but is kept separate for HCatalog
+ * dependencies.
  **/
 public interface HCatHadoopShims {
 
-	public static abstract class Instance {
-		static HCatHadoopShims instance = selectShim();
-		public static HCatHadoopShims get() {
-			return instance;
-		}
-
-		private static HCatHadoopShims selectShim() {
-			// piggyback on Hive's detection logic
-			String major = ShimLoader.getMajorVersion();
-			String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S";
-			if (major.startsWith("0.23")) {
-				shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23";
-			}
-			try {
-				Class<? extends HCatHadoopShims> clasz =
-						Class.forName(shimFQN).asSubclass(HCatHadoopShims.class);
-				return clasz.newInstance();
-			} catch (Exception e) {
-				throw new RuntimeException("Failed to instantiate: " + shimFQN, e);
-			}
-		}
-	}
+  public static abstract class Instance {
+    static HCatHadoopShims instance = selectShim();
 
-    public TaskAttemptContext createTaskAttemptContext(Configuration conf,
-                                TaskAttemptID taskId);
+    public static HCatHadoopShims get() {
+      return instance;
+    }
+
+    private static HCatHadoopShims selectShim() {
+      // piggyback on Hive's detection logic
+      String major = ShimLoader.getMajorVersion();
+      String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S";
+      if (major.startsWith("0.23")) {
+        shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23";
+      }
+      try {
+        Class<? extends HCatHadoopShims> clasz = Class.forName(shimFQN)
+            .asSubclass(HCatHadoopShims.class);
+        return clasz.newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to instantiate: " + shimFQN, e);
+      }
+    }
+  }
 
-    public JobContext createJobContext(Configuration conf,
-            JobID jobId);
+  public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+      TaskAttemptID taskId);
+
+  public JobContext createJobContext(Configuration conf, JobID jobId);
 
 }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java Fri Jun 22 01:40:27 2012
@@ -57,7 +57,7 @@ public class TestSemanticAnalysis extend
   @Override
   protected void setUp() throws Exception {
 
-	System.setProperty(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName());
+  System.setProperty(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName());
     HiveConf hcatConf = new HiveConf(this.getClass());
     hcatConf.set(ConfVars.PREEXECHOOKS.varname, "");
     hcatConf.set(ConfVars.POSTEXECHOOKS.varname, "");
@@ -77,14 +77,14 @@ public class TestSemanticAnalysis extend
   private final String tblName = "junit_sem_analysis";
 
   public void testDescDB() throws CommandNeedRetryException, IOException {
-	hcatDriver.run("drop database mydb cascade");
-	assertEquals(0, hcatDriver.run("create database mydb").getResponseCode());
-	CommandProcessorResponse resp = hcatDriver.run("describe database mydb");
-	assertEquals(0, resp.getResponseCode());
-	ArrayList<String> result = new ArrayList<String>();
-	hcatDriver.getResults(result);
-	assertTrue(result.get(0).contains("mydb.db"));
-	hcatDriver.run("drop database mydb cascade");
+  hcatDriver.run("drop database mydb cascade");
+  assertEquals(0, hcatDriver.run("create database mydb").getResponseCode());
+  CommandProcessorResponse resp = hcatDriver.run("describe database mydb");
+  assertEquals(0, resp.getResponseCode());
+  ArrayList<String> result = new ArrayList<String>();
+  hcatDriver.getResults(result);
+  assertTrue(result.get(0).contains("mydb.db"));
+  hcatDriver.run("drop database mydb cascade");
   }
 
   public void testCreateTblWithLowerCasePartNames() throws CommandNeedRetryException, MetaException, TException, NoSuchObjectException{
@@ -292,8 +292,8 @@ public class TestSemanticAnalysis extend
 
     hcatDriver.run("drop table junit_sem_analysis");
     query =  "create table junit_sem_analysis (a int) partitioned by (b string)  stored as " +
-    		"INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT " +
-    		"'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver 'mydriver' outputdriver 'yourdriver' ";
+        "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT " +
+        "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver 'mydriver' outputdriver 'yourdriver' ";
     assertEquals(0,hcatDriver.run(query).getResponseCode());
 
     Table tbl = msc.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName);