You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2012/06/22 01:40:29 UTC
svn commit: r1352738 [1/2] - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/cli/ src/java/org/apache/hcatalog/data/schema/
src/java/org/apache/hcatalog/data/transfer/
src/java/org/apache/hcatalog/data/transfer/impl/ src/java/org/apache/h...
Author: khorgath
Date: Fri Jun 22 01:40:27 2012
New Revision: 1352738
URL: http://svn.apache.org/viewvc?rev=1352738&view=rev
Log:
HCATALOG-424 Code cleanup of tabs (khorgath)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Jun 22 01:40:27 2012
@@ -26,6 +26,8 @@ Trunk (unreleased changes)
HCAT-328 HCatLoader should report its input size so pig can estimate the number of reducers (traviscrawford via gates)
IMPROVEMENTS
+ HCAT-424 Code cleanup of tabs (khorgath)
+
HCAT-331 Update HCatalog to junit 4 (traviscrawford via khorgath)
HCAT-414 More HCat e2e tests (khorgath via gates)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/HCatCli.java Fri Jun 22 01:40:27 2012
@@ -56,11 +56,11 @@ public class HCatCli {
@SuppressWarnings("static-access")
public static void main(String[] args) {
- try {
- LogUtils.initHiveLog4j();
- } catch (LogInitializationException e) {
+ try {
+ LogUtils.initHiveLog4j();
+ } catch (LogInitializationException e) {
- }
+ }
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
@@ -270,7 +270,7 @@ public class HCatCli {
ss.err.println("Failed with exception " + e.getClass().getName() + ":"
+ e.getMessage() + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
- }
+ }
int cret = driver.close();
if (ret == 0) {
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java Fri Jun 22 01:40:27 2012
@@ -153,7 +153,7 @@ public class HCatSchemaUtils {
case STRING:
return Type.STRING;
case BINARY:
- return Type.BINARY;
+ return Type.BINARY;
default:
throw new TypeNotPresentException(((PrimitiveTypeInfo)basePrimitiveTypeInfo).getTypeName(), null);
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java Fri Jun 22 01:40:27 2012
@@ -27,74 +27,109 @@ import org.apache.hcatalog.data.transfer
import org.apache.hcatalog.data.transfer.state.DefaultStateProvider;
import org.apache.hcatalog.data.transfer.state.StateProvider;
-/** Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} at master and slave nodes.
+/**
+ * Use this factory to get instances of {@link HCatReader} or {@link HCatWriter}
+ * at master and slave nodes.
*/
public class DataTransferFactory {
- /**
- * This should be called once from master node to obtain an instance of {@link HCatReader}.
- * @param re ReadEntity built using {@link ReadEntity.Builder}
- * @param config any configuration which master node wants to pass to HCatalog
- * @return {@link HCatReader}
- */
- public static HCatReader getHCatReader(final ReadEntity re, final Map<String,String> config) {
- // In future, this may examine ReadEntity and/or config to return appropriate HCatReader
- return new HCatInputFormatReader(re, config);
- }
-
- /**
- * This should only be called once from every slave node to obtain an instance of {@link HCatReader}.
- * @param split input split obtained at master node
- * @param config configuration obtained at master node
- * @return {@link HCatReader}
- */
- public static HCatReader getHCatReader(final InputSplit split, final Configuration config) {
- // In future, this may examine config to return appropriate HCatReader
- return getHCatReader(split, config, DefaultStateProvider.get());
- }
-
- /**
- * This should only be called once from every slave node to obtain an instance of {@link HCatReader}.
- * This should be called if an external system has some state to provide to HCatalog.
- * @param split input split obtained at master node
- * @param config configuration obtained at master node
- * @param sp {@link StateProvider}
- * @return {@link HCatReader}
- */
- public static HCatReader getHCatReader(final InputSplit split, final Configuration config, StateProvider sp) {
- // In future, this may examine config to return appropriate HCatReader
- return new HCatInputFormatReader(split, config, sp);
- }
-
- /** This should be called at master node to obtain an instance of {@link HCatWriter}.
- * @param we WriteEntity built using {@link WriteEntity.Builder}
- * @param config any configuration which master wants to pass to HCatalog
- * @return {@link HCatWriter}
- */
- public static HCatWriter getHCatWriter(final WriteEntity we, final Map<String,String> config) {
- // In future, this may examine WriteEntity and/or config to return appropriate HCatWriter
- return new HCatOutputFormatWriter(we, config);
- }
-
- /** This should be called at slave nodes to obtain an instance of {@link HCatWriter}.
- * @param cntxt {@link WriterContext} obtained at master node
- * @return {@link HCatWriter}
- */
- public static HCatWriter getHCatWriter(final WriterContext cntxt) {
- // In future, this may examine context to return appropriate HCatWriter
- return getHCatWriter(cntxt, DefaultStateProvider.get());
- }
-
- /** This should be called at slave nodes to obtain an instance of {@link HCatWriter}.
- * If an external system has some mechanism for providing state to HCatalog, this constructor
- * can be used.
- * @param cntxt {@link WriterContext} obtained at master node
- * @param sp {@link StateProvider}
- * @return {@link HCatWriter}
- */
- public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) {
- // In future, this may examine context to return appropriate HCatWriter
- return new HCatOutputFormatWriter(cntxt.getConf(), sp);
- }
+ /**
+ * This should be called once from master node to obtain an instance of
+ * {@link HCatReader}.
+ *
+ * @param re
+ * ReadEntity built using {@link ReadEntity.Builder}
+ * @param config
+ * any configuration which master node wants to pass to HCatalog
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final ReadEntity re,
+ final Map<String, String> config) {
+ // In future, this may examine ReadEntity and/or config to return
+ // appropriate HCatReader
+ return new HCatInputFormatReader(re, config);
+ }
+
+ /**
+ * This should only be called once from every slave node to obtain an instance
+ * of {@link HCatReader}.
+ *
+ * @param split
+ * input split obtained at master node
+ * @param config
+ * configuration obtained at master node
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final InputSplit split,
+ final Configuration config) {
+ // In future, this may examine config to return appropriate HCatReader
+ return getHCatReader(split, config, DefaultStateProvider.get());
+ }
+
+ /**
+ * This should only be called once from every slave node to obtain an instance
+ * of {@link HCatReader}. This should be called if an external system has some
+ * state to provide to HCatalog.
+ *
+ * @param split
+ * input split obtained at master node
+ * @param config
+ * configuration obtained at master node
+ * @param sp
+ * {@link StateProvider}
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final InputSplit split,
+ final Configuration config, StateProvider sp) {
+ // In future, this may examine config to return appropriate HCatReader
+ return new HCatInputFormatReader(split, config, sp);
+ }
+
+ /**
+ * This should be called at master node to obtain an instance of
+ * {@link HCatWriter}.
+ *
+ * @param we
+ * WriteEntity built using {@link WriteEntity.Builder}
+ * @param config
+ * any configuration which master wants to pass to HCatalog
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriteEntity we,
+ final Map<String, String> config) {
+ // In future, this may examine WriteEntity and/or config to return
+ // appropriate HCatWriter
+ return new HCatOutputFormatWriter(we, config);
+ }
+
+ /**
+ * This should be called at slave nodes to obtain an instance of
+ * {@link HCatWriter}.
+ *
+ * @param cntxt
+ * {@link WriterContext} obtained at master node
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriterContext cntxt) {
+ // In future, this may examine context to return appropriate HCatWriter
+ return getHCatWriter(cntxt, DefaultStateProvider.get());
+ }
+
+ /**
+ * This should be called at slave nodes to obtain an instance of
+ * {@link HCatWriter}. If an external system has some mechanism for providing
+ * state to HCatalog, this constructor can be used.
+ *
+ * @param cntxt
+ * {@link WriterContext} obtained at master node
+ * @param sp
+ * {@link StateProvider}
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriterContext cntxt,
+ final StateProvider sp) {
+ // In future, this may examine context to return appropriate HCatWriter
+ return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java Fri Jun 22 01:40:27 2012
@@ -20,35 +20,40 @@ package org.apache.hcatalog.data.transfe
import java.util.Map;
-/** This is a base class for {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. Many fields in them are common,
- * so this class contains the common fields.
+/**
+ * This is a base class for
+ * {@link ReadEntity.Builder} / {@link WriteEntity.Builder}.
+ * Many fields in them are common, so this class
+ * contains the common fields.
*/
abstract class EntityBase {
- String region;
- String tableName;
- String dbName;
- Map<String,String> partitionKVs;
-
-
-
- /** Common methods for {@link ReadEntity} and {@link WriteEntity}
- */
-
- abstract static class Entity extends EntityBase{
-
- public String getRegion() {
- return region;
- }
- public String getTableName() {
- return tableName;
- }
- public String getDbName() {
- return dbName;
- }
- public Map<String, String> getPartitionKVs() {
- return partitionKVs;
- }
- }
+ String region;
+ String tableName;
+ String dbName;
+ Map<String, String> partitionKVs;
+
+ /**
+ * Common methods for {@link ReadEntity} and {@link WriteEntity}
+ */
+
+ abstract static class Entity extends EntityBase {
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public Map<String, String> getPartitionKVs() {
+ return partitionKVs;
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java Fri Jun 22 01:40:27 2012
@@ -27,65 +27,75 @@ import org.apache.hcatalog.common.HCatEx
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.transfer.state.StateProvider;
-/** This abstract class is internal to HCatalog and abstracts away the notion of
- * underlying system from which reads will be done.
+/**
+ * This abstract class is internal to HCatalog and abstracts away the notion of
+ * underlying system from which reads will be done.
*/
-public abstract class HCatReader{
+public abstract class HCatReader {
- /** This should be called at master node to obtain {@link ReaderContext} which then should be
- * serialized and sent to slave nodes.
- * @return {@link ReaderContext}
- * @throws HCatException
- */
- public abstract ReaderContext prepareRead() throws HCatException;
-
- /** This should be called at slave nodes to read {@link HCatRecord}s
- * @return {@link Iterator} of {@link HCatRecord}
- * @throws HCatException
- */
- public abstract Iterator<HCatRecord> read() throws HCatException;
-
- /** This constructor will be invoked by {@link DataTransferFactory} at master node.
- * Don't use this constructor. Instead, use {@link DataTransferFactory}
- * @param re
- * @param config
- */
- protected HCatReader(final ReadEntity re, final Map<String,String> config) {
- this(config);
- this.re = re;
- }
-
- /** This constructor will be invoked by {@link DataTransferFactory} at slave nodes.
- * Don't use this constructor. Instead, use {@link DataTransferFactory}
- * @param config
- * @param sp
- */
-
- protected HCatReader(final Configuration config, StateProvider sp) {
- this.conf = config;
- this.sp = sp;
- }
-
- protected ReadEntity re; // This will be null at slaves.
- protected Configuration conf;
- protected ReaderContext info;
- protected StateProvider sp; // This will be null at master.
-
- private HCatReader(final Map<String,String> config) {
- Configuration conf = new Configuration();
- if (null != config) {
- for(Entry<String, String> kv : config.entrySet()){
- conf.set(kv.getKey(), kv.getValue());
- }
- }
- this.conf = conf;
- }
-
- public Configuration getConf() {
- if (null == conf) {
- throw new IllegalStateException("HCatReader is not constructed correctly.");
- }
- return conf;
- }
+ /**
+ * This should be called at master node to obtain {@link ReaderContext} which
+ * then should be serialized and sent to slave nodes.
+ *
+ * @return {@link ReaderContext}
+ * @throws HCatException
+ */
+ public abstract ReaderContext prepareRead() throws HCatException;
+
+ /**
+ * This should be called at slave nodes to read {@link HCatRecord}s
+ *
+ * @return {@link Iterator} of {@link HCatRecord}
+ * @throws HCatException
+ */
+ public abstract Iterator<HCatRecord> read() throws HCatException;
+
+ /**
+ * This constructor will be invoked by {@link DataTransferFactory} at master
+ * node. Don't use this constructor. Instead, use {@link DataTransferFactory}
+ *
+ * @param re
+ * @param config
+ */
+ protected HCatReader(final ReadEntity re, final Map<String, String> config) {
+ this(config);
+ this.re = re;
+ }
+
+ /**
+ * This constructor will be invoked by {@link DataTransferFactory} at slave
+ * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory}
+ *
+ * @param config
+ * @param sp
+ */
+
+ protected HCatReader(final Configuration config, StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ protected ReadEntity re; // This will be null at slaves.
+ protected Configuration conf;
+ protected ReaderContext info;
+ protected StateProvider sp; // This will be null at master.
+
+ private HCatReader(final Map<String, String> config) {
+ Configuration conf = new Configuration();
+ if (null != config) {
+ for (Entry<String, String> kv : config.entrySet()) {
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ if (null == conf) {
+ throw new IllegalStateException(
+ "HCatReader is not constructed correctly.");
+ }
+ return conf;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java Fri Jun 22 01:40:27 2012
@@ -27,69 +27,87 @@ import org.apache.hcatalog.common.HCatEx
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.transfer.state.StateProvider;
-/** This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external
- * systems. Don't try to instantiate this directly. Instead, use {@link DataTransferFactory}
+/**
+ * This abstraction is internal to HCatalog. This is to facilitate writing to
+ * HCatalog from external systems. Don't try to instantiate this directly.
+ * Instead, use {@link DataTransferFactory}
*/
public abstract class HCatWriter {
- protected Configuration conf;
- protected WriteEntity we; // This will be null at slave nodes.
- protected WriterContext info;
- protected StateProvider sp;
-
- /** External system should invoke this method exactly once from a master node.
- * @return {@link WriterContext} This should be serialized and sent to slave nodes to
- * construct HCatWriter there.
- * @throws HCatException
- */
- public abstract WriterContext prepareWrite() throws HCatException;
-
- /** This method should be used at slave needs to perform writes.
- * @param recordItr {@link Iterator} records to be written into HCatalog.
- * @throws {@link HCatException}
- */
- public abstract void write(final Iterator<HCatRecord> recordItr) throws HCatException;
-
- /** This method should be called at master node. Primary purpose of this is to do metadata commit.
- * @throws {@link HCatException}
- */
- public abstract void commit(final WriterContext context) throws HCatException;
-
- /** This method should be called at master node. Primary purpose of this is to do cleanups in case
- * of failures.
- * @throws {@link HCatException} *
- */
- public abstract void abort(final WriterContext context) throws HCatException;
-
- /**
- * This constructor will be used at master node
- * @param we WriteEntity defines where in storage records should be written to.
- * @param config Any configuration which external system wants to communicate to HCatalog
- * for performing writes.
- */
- protected HCatWriter(final WriteEntity we, final Map<String,String> config) {
- this(config);
- this.we = we;
- }
-
- /** This constructor will be used at slave nodes.
- * @param config
- */
- protected HCatWriter(final Configuration config, final StateProvider sp) {
- this.conf = config;
- this.sp = sp;
- }
-
- private HCatWriter(final Map<String,String> config) {
- Configuration conf = new Configuration();
- if(config != null){
- // user is providing config, so it could be null.
- for(Entry<String, String> kv : config.entrySet()){
- conf.set(kv.getKey(), kv.getValue());
- }
- }
+ protected Configuration conf;
+ protected WriteEntity we; // This will be null at slave nodes.
+ protected WriterContext info;
+ protected StateProvider sp;
- this.conf = conf;
- }
+ /**
+ * External system should invoke this method exactly once from a master node.
+ *
+ * @return {@link WriterContext} This should be serialized and sent to slave
+ * nodes to construct HCatWriter there.
+ * @throws HCatException
+ */
+ public abstract WriterContext prepareWrite() throws HCatException;
+
+ /**
+ * This method should be used at slave needs to perform writes.
+ *
+ * @param recordItr
+ * {@link Iterator} records to be written into HCatalog.
+ * @throws {@link HCatException}
+ */
+ public abstract void write(final Iterator<HCatRecord> recordItr)
+ throws HCatException;
+
+ /**
+ * This method should be called at master node. Primary purpose of this is to
+ * do metadata commit.
+ *
+ * @throws {@link HCatException}
+ */
+ public abstract void commit(final WriterContext context) throws HCatException;
+
+ /**
+ * This method should be called at master node. Primary purpose of this is to
+ * do cleanups in case of failures.
+ *
+ * @throws {@link HCatException} *
+ */
+ public abstract void abort(final WriterContext context) throws HCatException;
+
+ /**
+ * This constructor will be used at master node
+ *
+ * @param we
+ * WriteEntity defines where in storage records should be written to.
+ * @param config
+ * Any configuration which external system wants to communicate to
+ * HCatalog for performing writes.
+ */
+ protected HCatWriter(final WriteEntity we, final Map<String, String> config) {
+ this(config);
+ this.we = we;
+ }
+
+ /**
+ * This constructor will be used at slave nodes.
+ *
+ * @param config
+ */
+ protected HCatWriter(final Configuration config, final StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ private HCatWriter(final Map<String, String> config) {
+ Configuration conf = new Configuration();
+ if (config != null) {
+ // user is providing config, so it could be null.
+ for (Entry<String, String> kv : config.entrySet()) {
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ this.conf = conf;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java Fri Jun 22 01:40:27 2012
@@ -20,66 +20,69 @@ package org.apache.hcatalog.data.transfe
import java.util.Map;
-public class ReadEntity extends EntityBase.Entity{
+public class ReadEntity extends EntityBase.Entity {
- private String filterString;
+ private String filterString;
- /** Don't instantiate {@link ReadEntity} directly. Use, {@link ReadEntity.Builder} instead.
- *
- */
- private ReadEntity() {
- // Not allowed
- }
-
- private ReadEntity(Builder builder) {
-
- this.region = builder.region;
- this.dbName = builder.dbName;
- this.tableName = builder.tableName;
- this.partitionKVs = builder.partitionKVs;
- this.filterString = builder.filterString;
- }
-
- public String getFilterString() {
- return this.filterString;
- }
-
- /** This class should be used to build {@link ReadEntity}. It follows builder pattern, letting you build
- * your {@link ReadEntity} with whatever level of detail you want.
- *
- */
- public static class Builder extends EntityBase {
-
- private String filterString;
-
- public Builder withRegion(final String region) {
- this.region = region;
- return this;
- }
-
-
- public Builder withDatabase(final String dbName) {
- this.dbName = dbName;
- return this;
- }
-
- public Builder withTable(final String tblName) {
- this.tableName = tblName;
- return this;
- }
-
- public Builder withPartition(final Map<String,String> partKVs) {
- this.partitionKVs = partKVs;
- return this;
- }
-
- public Builder withFilter(String filterString) {
- this.filterString = filterString;
- return this;
- }
-
- public ReadEntity build() {
- return new ReadEntity(this);
- }
- }
+ /**
+ * Don't instantiate {@link ReadEntity} directly. Use,
+ * {@link ReadEntity.Builder} instead.
+ *
+ */
+ private ReadEntity() {
+ // Not allowed
+ }
+
+ private ReadEntity(Builder builder) {
+
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ this.filterString = builder.filterString;
+ }
+
+ public String getFilterString() {
+ return this.filterString;
+ }
+
+ /**
+ * This class should be used to build {@link ReadEntity}. It follows builder
+ * pattern, letting you build your {@link ReadEntity} with whatever level of
+ * detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ private String filterString;
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String, String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public Builder withFilter(String filterString) {
+ this.filterString = filterString;
+ return this;
+ }
+
+ public ReadEntity build() {
+ return new ReadEntity(this);
+ }
+ }
}
\ No newline at end of file
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java Fri Jun 22 01:40:27 2012
@@ -30,57 +30,59 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hcatalog.mapreduce.HCatSplit;
-/** This class will contain information of different {@link InputSplit} obtained at master node
- * and configuration. This class implements {@link Externalizable} so it can be serialized using
- * standard java mechanisms.
+/**
+ * This class will contain information of different {@link InputSplit} obtained
+ * at master node and configuration. This class implements
+ * {@link Externalizable} so it can be serialized using standard java
+ * mechanisms.
*/
public class ReaderContext implements Externalizable, Configurable {
- private static final long serialVersionUID = -2656468331739574367L;
- private List<InputSplit> splits;
- private Configuration conf;
-
- public ReaderContext() {
- this.splits = new ArrayList<InputSplit>();
- this.conf = new Configuration();
- }
-
- public void setInputSplits(final List<InputSplit> splits) {
- this.splits = splits;
- }
-
- public List<InputSplit> getSplits() {
- return splits;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- out.writeInt(splits.size());
- for (InputSplit split : splits) {
- ((HCatSplit)split).write(out);
- }
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- int numOfSplits = in.readInt();
- for (int i=0 ; i < numOfSplits; i++) {
- HCatSplit split = new HCatSplit();
- split.readFields(in);
- splits.add(split);
- }
- }
+ private static final long serialVersionUID = -2656468331739574367L;
+ private List<InputSplit> splits;
+ private Configuration conf;
+
+ public ReaderContext() {
+ this.splits = new ArrayList<InputSplit>();
+ this.conf = new Configuration();
+ }
+
+ public void setInputSplits(final List<InputSplit> splits) {
+ this.splits = splits;
+ }
+
+ public List<InputSplit> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ out.writeInt(splits.size());
+ for (InputSplit split : splits) {
+ ((HCatSplit) split).write(out);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ int numOfSplits = in.readInt();
+ for (int i = 0; i < numOfSplits; i++) {
+ HCatSplit split = new HCatSplit();
+ split.readFields(in);
+ splits.add(split);
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java Fri Jun 22 01:40:27 2012
@@ -20,52 +20,55 @@ package org.apache.hcatalog.data.transfe
import java.util.Map;
-public class WriteEntity extends EntityBase.Entity{
+public class WriteEntity extends EntityBase.Entity {
- /** Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to build
- * {@link WriteEntity}.
- */
-
- private WriteEntity() {
- // Not allowed.
- }
-
- private WriteEntity(Builder builder) {
- this.region = builder.region;
- this.dbName = builder.dbName;
- this.tableName = builder.tableName;
- this.partitionKVs = builder.partitionKVs;
- }
-
- /** This class should be used to build {@link WriteEntity}. It follows builder pattern, letting you build
- * your {@link WriteEntity} with whatever level of detail you want.
- *
- */
- public static class Builder extends EntityBase{
-
- public Builder withRegion(final String region) {
- this.region = region;
- return this;
- }
-
- public Builder withDatabase(final String dbName) {
- this.dbName = dbName;
- return this;
- }
-
- public Builder withTable(final String tblName) {
- this.tableName = tblName;
- return this;
- }
-
- public Builder withPartition(final Map<String,String> partKVs) {
- this.partitionKVs = partKVs;
- return this;
- }
-
- public WriteEntity build() {
- return new WriteEntity(this);
- }
-
- }
+ /**
+ * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to
+ * build {@link WriteEntity}.
+ */
+
+ private WriteEntity() {
+ // Not allowed.
+ }
+
+ private WriteEntity(Builder builder) {
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ }
+
+ /**
+ * This class should be used to build {@link WriteEntity}. It follows builder
+ * pattern, letting you build your {@link WriteEntity} with whatever level of
+ * detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String, String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public WriteEntity build() {
+ return new WriteEntity(this);
+ }
+
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java Fri Jun 22 01:40:27 2012
@@ -26,38 +26,39 @@ import java.io.ObjectOutput;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-/** This contains information obtained at master node to help prepare slave nodes for writer.
- * This class implements {@link Externalizable} so it can be serialized using
- * standard java mechanisms. Master should serialize it and make it available to slaves to
- * prepare for writes.
+/**
+ * This contains information obtained at master node to help prepare slave nodes
+ * for writer. This class implements {@link Externalizable} so it can be
+ * serialized using standard java mechanisms. Master should serialize it and
+ * make it available to slaves to prepare for writes.
*/
-public class WriterContext implements Externalizable, Configurable{
+public class WriterContext implements Externalizable, Configurable {
- private static final long serialVersionUID = -5899374262971611840L;
- private Configuration conf;
+ private static final long serialVersionUID = -5899374262971611840L;
+ private Configuration conf;
- public WriterContext() {
- conf = new Configuration();
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- this.conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- }
+ public WriterContext() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Jun 22 01:40:27 2012
@@ -40,98 +40,102 @@ import org.apache.hcatalog.data.transfer
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
-/** This reader reads via {@link HCatInputFormat}
+/**
+ * This reader reads via {@link HCatInputFormat}
*
*/
-public class HCatInputFormatReader extends HCatReader{
+public class HCatInputFormatReader extends HCatReader {
- private InputSplit split;
-
- public HCatInputFormatReader(InputSplit split, Configuration config, StateProvider sp) {
- super(config, sp);
- this.split = split;
- }
-
- public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
- super(info,config);
- }
-
- @Override
- public ReaderContext prepareRead() throws HCatException {
-
- try {
- Job job = new Job(conf);
- InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString());
- HCatInputFormat.setInput(job, jobInfo);
- HCatInputFormat hcif = new HCatInputFormat();
- ReaderContext cntxt = new ReaderContext();
- cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null)));
- cntxt.setConf(job.getConfiguration());
- return cntxt;
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED,e);
- }
- }
-
- @Override
- public Iterator<HCatRecord> read() throws HCatException {
-
- HCatInputFormat inpFmt = new HCatInputFormat();
- RecordReader<WritableComparable, HCatRecord> rr;
- try {
- TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID());
- rr = inpFmt.createRecordReader(split, cntxt);
- rr.initialize(split, cntxt);
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- return new HCatRecordItr(rr);
- }
-
-
- private static class HCatRecordItr implements Iterator<HCatRecord>{
-
- private RecordReader<WritableComparable, HCatRecord> curRecReader;
-
- HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
- curRecReader = rr;
- }
-
- @Override
- public boolean hasNext(){
- try {
- boolean retVal = curRecReader.nextKeyValue();
- if (retVal) {
- return true;
- }
- // if its false, we need to close recordReader.
- curRecReader.close();
- return false;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public HCatRecord next() {
- try {
- return curRecReader.getCurrentValue();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Not allowed");
- }
- }
+ private InputSplit split;
+
+ public HCatInputFormatReader(InputSplit split, Configuration config,
+ StateProvider sp) {
+ super(config, sp);
+ this.split = split;
+ }
+
+ public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+ super(info, config);
+ }
+
+ @Override
+ public ReaderContext prepareRead() throws HCatException {
+
+ try {
+ Job job = new Job(conf);
+ InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(),
+ re.getTableName(), re.getFilterString());
+ HCatInputFormat.setInput(job, jobInfo);
+ HCatInputFormat hcif = new HCatInputFormat();
+ ReaderContext cntxt = new ReaderContext();
+ cntxt.setInputSplits(hcif.getSplits(new JobContext(
+ job.getConfiguration(), null)));
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ @Override
+ public Iterator<HCatRecord> read() throws HCatException {
+
+ HCatInputFormat inpFmt = new HCatInputFormat();
+ RecordReader<WritableComparable, HCatRecord> rr;
+ try {
+ TaskAttemptContext cntxt = new TaskAttemptContext(conf,
+ new TaskAttemptID());
+ rr = inpFmt.createRecordReader(split, cntxt);
+ rr.initialize(split, cntxt);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ return new HCatRecordItr(rr);
+ }
+
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+ HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+ curRecReader = rr;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ boolean retVal = curRecReader.nextKeyValue();
+ if (retVal) {
+ return true;
+ }
+ // if its false, we need to close recordReader.
+ curRecReader.close();
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public HCatRecord next() {
+ try {
+ return curRecReader.getCurrentValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not allowed");
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Jun 22 01:40:27 2012
@@ -42,112 +42,121 @@ import org.apache.hcatalog.data.transfer
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
-/** This writer writes via {@link HCatOutputFormat}
+/**
+ * This writer writes via {@link HCatOutputFormat}
*
*/
public class HCatOutputFormatWriter extends HCatWriter {
- public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
- super(we, config);
- }
-
- public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
- super(config, sp);
- }
-
- @Override
- public WriterContext prepareWrite() throws HCatException {
- OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs());
- Job job;
- try {
- job = new Job(conf);
- HCatOutputFormat.setOutput(job, jobInfo);
- HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
- HCatOutputFormat outFormat = new HCatOutputFormat();
- outFormat.checkOutputSpecs(job);
- outFormat.getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).setupJob(job);
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- WriterContext cntxt = new WriterContext();
- cntxt.setConf(job.getConfiguration());
- return cntxt;
- }
-
- @Override
- public void write(Iterator<HCatRecord> recordItr) throws HCatException {
-
- int id = sp.getId();
- setVarsInConf(id);
- HCatOutputFormat outFormat = new HCatOutputFormat();
- TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(new TaskID(), id));
- OutputCommitter committer = null;
- RecordWriter<WritableComparable<?>, HCatRecord> writer;
- try {
- committer = outFormat.getOutputCommitter(cntxt);
- committer.setupTask(cntxt);
- writer = outFormat.getRecordWriter(cntxt);
- while(recordItr.hasNext()){
- HCatRecord rec = recordItr.next();
- writer.write(null, rec);
- }
- writer.close(cntxt);
- if(committer.needsTaskCommit(cntxt)){
- committer.commitTask(cntxt);
- }
- } catch (IOException e) {
- if(null != committer) {
- try {
- committer.abortTask(cntxt);
- } catch (IOException e1) {
- throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
- }
- }
- throw new HCatException("Failed while writing",e);
- } catch (InterruptedException e) {
- if(null != committer) {
- try {
- committer.abortTask(cntxt);
- } catch (IOException e1) {
- throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
- }
- }
- throw new HCatException("Failed while writing", e);
- }
- }
-
- @Override
- public void commit(WriterContext context) throws HCatException {
- try {
- new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
- .commitJob(new JobContext(context.getConf(), null));
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- }
-
- @Override
- public void abort(WriterContext context) throws HCatException {
- try {
- new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
- .abortJob(new JobContext(context.getConf(), null),State.FAILED);
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- }
-
- private void setVarsInConf(int id) {
-
- // Following two config keys are required by FileOutputFormat to work correctly.
- // In usual case of Hadoop, JobTracker will set these before launching tasks.
- // Since there is no jobtracker here, we set it ourself.
- conf.setInt("mapred.task.partition", id);
- conf.set("mapred.task.id", "attempt__0000_r_000000_"+id);
- }
+ public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+ super(we, config);
+ }
+
+ public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+ super(config, sp);
+ }
+
+ @Override
+ public WriterContext prepareWrite() throws HCatException {
+ OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
+ we.getTableName(), we.getPartitionKVs());
+ Job job;
+ try {
+ job = new Job(conf);
+ HCatOutputFormat.setOutput(job, jobInfo);
+ HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ outFormat.checkOutputSpecs(job);
+ outFormat.getOutputCommitter(
+ new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()))
+ .setupJob(job);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ WriterContext cntxt = new WriterContext();
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ }
+
+ @Override
+ public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+
+ int id = sp.getId();
+ setVarsInConf(id);
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(
+ new TaskID(), id));
+ OutputCommitter committer = null;
+ RecordWriter<WritableComparable<?>, HCatRecord> writer;
+ try {
+ committer = outFormat.getOutputCommitter(cntxt);
+ committer.setupTask(cntxt);
+ writer = outFormat.getRecordWriter(cntxt);
+ while (recordItr.hasNext()) {
+ HCatRecord rec = recordItr.next();
+ writer.write(null, rec);
+ }
+ writer.close(cntxt);
+ if (committer.needsTaskCommit(cntxt)) {
+ committer.commitTask(cntxt);
+ }
+ } catch (IOException e) {
+ if (null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
+ } catch (InterruptedException e) {
+ if (null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
+ }
+ }
+
+ @Override
+ public void commit(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(
+ new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+ .commitJob(new JobContext(context.getConf(), null));
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ @Override
+ public void abort(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(
+ new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+ .abortJob(new JobContext(context.getConf(), null), State.FAILED);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ private void setVarsInConf(int id) {
+
+ // Following two config keys are required by FileOutputFormat to work
+ // correctly.
+ // In usual case of Hadoop, JobTracker will set these before launching
+ // tasks.
+ // Since there is no jobtracker here, we set it ourself.
+ conf.setInt("mapred.task.partition", id);
+ conf.set("mapred.task.id", "attempt__0000_r_000000_" + id);
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Fri Jun 22 01:40:27 2012
@@ -21,26 +21,27 @@ package org.apache.hcatalog.data.transfe
import java.text.NumberFormat;
import java.util.Random;
-
public class DefaultStateProvider implements StateProvider {
- /** Default implementation. Here, ids are generated randomly.
- */
- @Override
- public int getId() {
-
- NumberFormat numberFormat = NumberFormat.getInstance();
- numberFormat.setMinimumIntegerDigits(5);
- numberFormat.setGroupingUsed(false);
- return Integer.parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
- }
+ /**
+ * Default implementation. Here, ids are generated randomly.
+ */
+ @Override
+ public int getId() {
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+ return Integer
+ .parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+ }
+
+ private static StateProvider sp;
- private static StateProvider sp;
-
- public static synchronized StateProvider get() {
- if (null == sp) {
- sp = new DefaultStateProvider();
- }
- return sp;
- }
+ public static synchronized StateProvider get() {
+ if (null == sp) {
+ sp = new DefaultStateProvider();
+ }
+ return sp;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Fri Jun 22 01:40:27 2012
@@ -21,14 +21,17 @@ package org.apache.hcatalog.data.transfe
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.TaskTracker;
-/** If external system wants to communicate any state to slaves, they can do so via this interface.
- * One example of this in case of Map-Reduce is ids assigned by {@link JobTracker} to
- * {@link TaskTracker}
+/**
+ * If external system wants to communicate any state to slaves, they can do so
+ * via this interface. One example of this in case of Map-Reduce is ids assigned
+ * by {@link JobTracker} to {@link TaskTracker}
*/
public interface StateProvider {
- /** This method should return id assigned to slave node.
- * @return id
- */
- public int getId();
+ /**
+ * This method should return id assigned to slave node.
+ *
+ * @return id
+ */
+ public int getId();
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Fri Jun 22 01:40:27 2012
@@ -64,288 +64,310 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hcatalog.common.HCatConstants;
/**
- * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener}
- * It sends message on two type of topics. One has name of form dbName.tblName
- * On this topic, two kind of messages are sent: add/drop partition and
- * finalize_partition message.
- * Second topic has name "HCAT" and messages sent on it are: add/drop database
- * and add/drop table.
- * All messages also has a property named "HCAT_EVENT" set on them whose value
- * can be used to configure message selector on subscriber side.
+ * Implementation of
+ * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
+ * message on two type of topics. One has name of form dbName.tblName On this
+ * topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message. Second topic has name "HCAT" and messages sent on
+ * it are: add/drop database and add/drop table. All messages also has a
+ * property named "HCAT_EVENT" set on them whose value can be used to configure
+ * message selector on subscriber side.
*/
-public class NotificationListener extends MetaStoreEventListener{
+public class NotificationListener extends MetaStoreEventListener {
- private static final Log LOG = LogFactory.getLog(NotificationListener.class);
- protected Session session;
- protected Connection conn;
-
- /**
- * Create message bus connection and session in constructor.
- */
- public NotificationListener(final Configuration conf) {
-
- super(conf);
- createConnection();
- }
-
- private static String getTopicName(Partition partition,
- ListenerEvent partitionEvent) throws MetaException {
- try {
- return partitionEvent.getHandler()
- .get_table(partition.getDbName(), partition.getTableName())
- .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
- } catch (NoSuchObjectException e) {
- throw new MetaException(e.toString());
- }
- }
-
- @Override
- public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
- // Subscriber can get notification of newly add partition in a
- // particular table by listening on a topic named "dbName.tableName"
- // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
- if(partitionEvent.getStatus()){
+ private static final Log LOG = LogFactory.getLog(NotificationListener.class);
+ protected Session session;
+ protected Connection conn;
+
+ /**
+ * Create message bus connection and session in constructor.
+ */
+ public NotificationListener(final Configuration conf) {
+
+ super(conf);
+ createConnection();
+ }
+
+ private static String getTopicName(Partition partition,
+ ListenerEvent partitionEvent) throws MetaException {
+ try {
+ return partitionEvent.getHandler()
+ .get_table(partition.getDbName(), partition.getTableName())
+ .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+ } catch (NoSuchObjectException e) {
+ throw new MetaException(e.toString());
+ }
+ }
+
+ @Override
+ public void onAddPartition(AddPartitionEvent partitionEvent)
+ throws MetaException {
+ // Subscriber can get notification of newly add partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+ if (partitionEvent.getStatus()) {
- Partition partition = partitionEvent.getPartition();
- String topicName = getTopicName(partition, partitionEvent);
+ Partition partition = partitionEvent.getPartition();
+ String topicName = getTopicName(partition, partitionEvent);
if (topicName != null && !topicName.equals("")) {
- send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
- }
- else {
- LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName()
- + "." + partition.getTableName()
+ send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ " To enable notifications for this table, please do alter table set properties ("
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ "=<dbname>.<tablename>) or whatever you want topic name to be.");
}
- }
+ }
- }
+ }
- @Override
- public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
- // Subscriber can get notification of dropped partition in a
- // particular table by listening on a topic named "dbName.tableName"
- // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
-
- // Datanucleus throws NPE when we try to serialize a partition object
- // retrieved from metastore. To workaround that we reset following objects
-
- if(partitionEvent.getStatus()){
- Partition partition = partitionEvent.getPartition();
- StorageDescriptor sd = partition.getSd();
- sd.setBucketCols(new ArrayList<String>());
- sd.setSortCols(new ArrayList<Order>());
- sd.setParameters(new HashMap<String, String>());
- sd.getSerdeInfo().setParameters(new HashMap<String, String>());
- String topicName = getTopicName(partition, partitionEvent);
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent)
+ throws MetaException {
+ // Subscriber can get notification of dropped partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
+
+ // Datanucleus throws NPE when we try to serialize a partition object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if (partitionEvent.getStatus()) {
+ Partition partition = partitionEvent.getPartition();
+ StorageDescriptor sd = partition.getSd();
+ sd.setBucketCols(new ArrayList<String>());
+ sd.setSortCols(new ArrayList<Order>());
+ sd.setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ String topicName = getTopicName(partition, partitionEvent);
if (topicName != null && !topicName.equals("")) {
- send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
- }
- else {
- LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName()
- + "." + partition.getTableName()
+ send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ " To enable notifications for this table, please do alter table set properties ("
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ "=<dbname>.<tablename>) or whatever you want topic name to be.");
}
- }
- }
+ }
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+ throws MetaException {
+ // Subscriber can get notification about addition of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+ if (dbEvent.getStatus())
+ send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+ .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+ // Subscriber can get notification about drop of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+ if (dbEvent.getStatus())
+ send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+ .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about addition of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_TABLE"
+ if (tableEvent.getStatus()) {
+ Table tbl = tableEvent.getTable();
+ HMSHandler handler = tableEvent.getHandler();
+ HiveConf conf = handler.getHiveConf();
+ Table newTbl;
+ try {
+ newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+ .deepCopy();
+ newTbl.getParameters().put(
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+ getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+ + newTbl.getTableName().toLowerCase());
+ handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+ } catch (InvalidOperationException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ } catch (NoSuchObjectException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ }
+ send(newTbl, getTopicPrefix(conf) + "."
+ + newTbl.getDbName().toLowerCase(),
+ HCatConstants.HCAT_ADD_TABLE_EVENT);
+ }
+ }
+
+ private String getTopicPrefix(HiveConf conf) {
+ return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
+ HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+ }
+
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about drop of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+ // Datanucleus throws NPE when we try to serialize a table object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if (tableEvent.getStatus()) {
+ Table table = tableEvent.getTable();
+ StorageDescriptor sd = table.getSd();
+ sd.setBucketCols(new ArrayList<String>());
+ sd.setSortCols(new ArrayList<Order>());
+ sd.setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
+ + table.getDbName().toLowerCase(),
+ HCatConstants.HCAT_DROP_TABLE_EVENT);
+ }
+ }
+
+ /**
+ * @param msgBody
+ * is the metastore object. It is sent in full such that if
+ * subscriber is really interested in details, it can reconstruct it
+ * fully. In case of finalize_partition message this will be string
+ * specification of the partition.
+ * @param topicName
+ * is the name on message broker on which message is sent.
+ * @param event
+ * is the value of HCAT_EVENT property in message. It can be used to
+ * select messages in client side.
+ */
+ protected void send(Object msgBody, String topicName, String event) {
+
+ try {
+
+ Destination topic = null;
+ if (null == session) {
+ // this will happen, if we never able to establish a connection.
+ createConnection();
+ if (null == session) {
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "
+ + topicName + " event: " + event);
+ return;
+ }
+ }
+ try {
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ topic = session.createTopic(topicName);
+ } catch (IllegalStateException ise) {
+ // this will happen if we were able to establish connection once, but
+ // its no longer valid,
+ // ise is thrown, catch it and retry.
+ LOG.error("Seems like connection is lost. Retrying", ise);
+ createConnection();
+ topic = session.createTopic(topicName);
+ }
+ if (null == topic) {
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "
+ + topicName + " event: " + event);
+ return;
+ }
+ MessageProducer producer = session.createProducer(topic);
+ Message msg;
+ if (msgBody instanceof Map) {
+ MapMessage mapMsg = session.createMapMessage();
+ Map<String, String> incomingMap = (Map<String, String>) msgBody;
+ for (Entry<String, String> partCol : incomingMap.entrySet()) {
+ mapMsg.setString(partCol.getKey(), partCol.getValue());
+ }
+ msg = mapMsg;
+ } else {
+ msg = session.createObjectMessage((Serializable) msgBody);
+ }
+
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+ producer.send(msg);
+ // Message must be transacted before we return.
+ session.commit();
+ } catch (Exception e) {
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName + " event: "
+ + event, e);
+ }
+ }
+
+ protected void createConnection() {
+
+ Context jndiCntxt;
+ try {
+ jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
+ .lookup("ConnectionFactory");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ conn.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException jmse) {
+ LOG.error(jmse);
+ }
+ });
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. "
+ + "Please make sure file named 'jndi.properties' is in "
+ + "classpath and contains appropriate key-value pairs.", e);
+ } catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus", e);
+ } catch (Throwable t) {
+ LOG.error("Unable to connect to JMS provider", t);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ // Close the connection before dying.
+ try {
+ if (null != session)
+ session.close();
+ if (conn != null) {
+ conn.close();
+ }
- @Override
- public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
- // Subscriber can get notification about addition of a database in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_ADD_DATABASE"
- if(dbEvent.getStatus())
- send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT);
- }
-
- @Override
- public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
- // Subscriber can get notification about drop of a database in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_DROP_DATABASE"
- if(dbEvent.getStatus())
- send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT);
- }
-
- @Override
- public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
- // Subscriber can get notification about addition of a table in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_ADD_TABLE"
- if(tableEvent.getStatus()){
- Table tbl = tableEvent.getTable();
- HMSHandler handler = tableEvent.getHandler();
- HiveConf conf = handler.getHiveConf();
- Table newTbl;
- try {
- newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()).deepCopy();
- newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
- getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase()
- +"." + newTbl.getTableName().toLowerCase());
- handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
- } catch (InvalidOperationException e) {
- MetaException me = new MetaException(e.toString());
- me.initCause(e);
- throw me;
- } catch (NoSuchObjectException e) {
- MetaException me = new MetaException(e.toString());
- me.initCause(e);
- throw me;
- }
- send(newTbl,getTopicPrefix(conf)+ "."+ newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT);
- }
- }
-
- private String getTopicPrefix(HiveConf conf){
- return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
- }
-
- @Override
- public void onDropTable(DropTableEvent tableEvent) throws MetaException {
- // Subscriber can get notification about drop of a table in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_DROP_TABLE"
-
- // Datanucleus throws NPE when we try to serialize a table object
- // retrieved from metastore. To workaround that we reset following objects
-
- if(tableEvent.getStatus()){
- Table table = tableEvent.getTable();
- StorageDescriptor sd = table.getSd();
- sd.setBucketCols(new ArrayList<String>());
- sd.setSortCols(new ArrayList<Order>());
- sd.setParameters(new HashMap<String, String>());
- sd.getSerdeInfo().setParameters(new HashMap<String, String>());
- send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName().toLowerCase(), HCatConstants.HCAT_DROP_TABLE_EVENT);
- }
- }
-
- /**
- * @param msgBody is the metastore object. It is sent in full such that
- * if subscriber is really interested in details, it can reconstruct it fully.
- * In case of finalize_partition message this will be string specification of
- * the partition.
- * @param topicName is the name on message broker on which message is sent.
- * @param event is the value of HCAT_EVENT property in message. It can be
- * used to select messages in client side.
- */
- protected void send(Object msgBody, String topicName, String event){
-
- try{
-
- Destination topic = null;
- if(null == session){
- // this will happen, if we never able to establish a connection.
- createConnection();
- if (null == session){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: "+
- topicName + " event: "+event);
- return;
- }
- }
- try{
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise){
- // this will happen if we were able to establish connection once, but its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost. Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
- }
- if (null == topic){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: "+
- topicName + " event: "+event);
- return;
- }
- MessageProducer producer = session.createProducer(topic);
- Message msg;
- if (msgBody instanceof Map){
- MapMessage mapMsg = session.createMapMessage();
- Map<String,String> incomingMap = (Map<String,String>)msgBody;
- for (Entry<String,String> partCol : incomingMap.entrySet()){
- mapMsg.setString(partCol.getKey(), partCol.getValue());
- }
- msg = mapMsg;
- }
- else {
- msg = session.createObjectMessage((Serializable)msgBody);
- }
-
- msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
- producer.send(msg);
- // Message must be transacted before we return.
- session.commit();
- } catch(Exception e){
- // Gobble up the exception. Message delivery is best effort.
- LOG.error("Failed to send message on topic: "+topicName +
- " event: "+event , e);
- }
- }
-
- protected void createConnection(){
-
- Context jndiCntxt;
- try {
- jndiCntxt = new InitialContext();
- ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
- Connection conn = connFac.createConnection();
- conn.start();
- conn.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException jmse) {
- LOG.error(jmse);
- }
- });
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- } catch (NamingException e) {
- LOG.error("JNDI error while setting up Message Bus connection. " +
- "Please make sure file named 'jndi.properties' is in " +
- "classpath and contains appropriate key-value pairs.",e);
- } catch (JMSException e) {
- LOG.error("Failed to initialize connection to message bus",e);
- } catch(Throwable t){
- LOG.error("Unable to connect to JMS provider",t);
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- // Close the connection before dying.
- try {
- if (null != session)
- session.close();
- if(conn != null) {
- conn.close();
- }
-
- } catch (Exception ignore) {
- LOG.info("Failed to close message bus connection.", ignore);
- }
- }
-
- @Override
- public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
- throws MetaException {
- if(lpde.getStatus())
- send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
- }
-
- @Override
- public void onAlterPartition(AlterPartitionEvent ape) throws MetaException{
- //no-op
- }
-
- @Override
- public void onAlterTable(AlterTableEvent ate) throws MetaException {
- // no-op
- }
+ } catch (Exception ignore) {
+ LOG.info("Failed to close message bus connection.", ignore);
+ }
+ }
+
+ @Override
+ public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+ throws MetaException {
+ if (lpde.getStatus())
+ send(
+ lpde.getPartitionName(),
+ lpde.getTable().getParameters()
+ .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
+ HCatConstants.HCAT_PARTITION_DONE_EVENT);
+ }
+
+ @Override
+ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onAlterTable(AlterTableEvent ate) throws MetaException {
+ // no-op
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java Fri Jun 22 01:40:27 2012
@@ -166,7 +166,7 @@ public abstract class HCatBaseStorer ext
return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
case DataType.BYTEARRAY:
- return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
+ return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
case DataType.BAG:
Schema bagSchema = fSchema.schema;
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Jun 22 01:40:27 2012
@@ -156,7 +156,7 @@ public class HCatStorer extends HCatBase
//Calling it from here so that the partition publish happens.
//This call needs to be removed after MAPREDUCE-1447 is fixed.
getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
- job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
+ job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
} catch (IOException e) {
throw new IOException("Failed to cleanup job",e);
} catch (InterruptedException e) {
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java Fri Jun 22 01:40:27 2012
@@ -91,7 +91,7 @@ public class StorageDelegationAuthorizat
//else we do not have anything to delegate to
throw new HiveException(String.format("Storage Handler for table:%s is not an instance " +
- "of HCatStorageHandler", table.getTableName()));
+ "of HCatStorageHandler", table.getTableName()));
}
} else {
//return an authorizer for HDFS
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Fri Jun 22 01:40:27 2012
@@ -25,38 +25,39 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
- * Shim layer to abstract differences between Hadoop 0.20 and 0.23 (HCATALOG-179).
- * This mirrors Hive shims, but is kept separate for HCatalog dependencies.
+ * Shim layer to abstract differences between Hadoop 0.20 and 0.23
+ * (HCATALOG-179). This mirrors Hive shims, but is kept separate for HCatalog
+ * dependencies.
**/
public interface HCatHadoopShims {
- public static abstract class Instance {
- static HCatHadoopShims instance = selectShim();
- public static HCatHadoopShims get() {
- return instance;
- }
-
- private static HCatHadoopShims selectShim() {
- // piggyback on Hive's detection logic
- String major = ShimLoader.getMajorVersion();
- String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S";
- if (major.startsWith("0.23")) {
- shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23";
- }
- try {
- Class<? extends HCatHadoopShims> clasz =
- Class.forName(shimFQN).asSubclass(HCatHadoopShims.class);
- return clasz.newInstance();
- } catch (Exception e) {
- throw new RuntimeException("Failed to instantiate: " + shimFQN, e);
- }
- }
- }
+ public static abstract class Instance {
+ static HCatHadoopShims instance = selectShim();
- public TaskAttemptContext createTaskAttemptContext(Configuration conf,
- TaskAttemptID taskId);
+ public static HCatHadoopShims get() {
+ return instance;
+ }
+
+ private static HCatHadoopShims selectShim() {
+ // piggyback on Hive's detection logic
+ String major = ShimLoader.getMajorVersion();
+ String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S";
+ if (major.startsWith("0.23")) {
+ shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23";
+ }
+ try {
+ Class<? extends HCatHadoopShims> clasz = Class.forName(shimFQN)
+ .asSubclass(HCatHadoopShims.class);
+ return clasz.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate: " + shimFQN, e);
+ }
+ }
+ }
- public JobContext createJobContext(Configuration conf,
- JobID jobId);
+ public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId);
+
+ public JobContext createJobContext(Configuration conf, JobID jobId);
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java Fri Jun 22 01:40:27 2012
@@ -57,7 +57,7 @@ public class TestSemanticAnalysis extend
@Override
protected void setUp() throws Exception {
- System.setProperty(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName());
+ System.setProperty(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName());
HiveConf hcatConf = new HiveConf(this.getClass());
hcatConf.set(ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(ConfVars.POSTEXECHOOKS.varname, "");
@@ -77,14 +77,14 @@ public class TestSemanticAnalysis extend
private final String tblName = "junit_sem_analysis";
public void testDescDB() throws CommandNeedRetryException, IOException {
- hcatDriver.run("drop database mydb cascade");
- assertEquals(0, hcatDriver.run("create database mydb").getResponseCode());
- CommandProcessorResponse resp = hcatDriver.run("describe database mydb");
- assertEquals(0, resp.getResponseCode());
- ArrayList<String> result = new ArrayList<String>();
- hcatDriver.getResults(result);
- assertTrue(result.get(0).contains("mydb.db"));
- hcatDriver.run("drop database mydb cascade");
+ hcatDriver.run("drop database mydb cascade");
+ assertEquals(0, hcatDriver.run("create database mydb").getResponseCode());
+ CommandProcessorResponse resp = hcatDriver.run("describe database mydb");
+ assertEquals(0, resp.getResponseCode());
+ ArrayList<String> result = new ArrayList<String>();
+ hcatDriver.getResults(result);
+ assertTrue(result.get(0).contains("mydb.db"));
+ hcatDriver.run("drop database mydb cascade");
}
public void testCreateTblWithLowerCasePartNames() throws CommandNeedRetryException, MetaException, TException, NoSuchObjectException{
@@ -292,8 +292,8 @@ public class TestSemanticAnalysis extend
hcatDriver.run("drop table junit_sem_analysis");
query = "create table junit_sem_analysis (a int) partitioned by (b string) stored as " +
- "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT " +
- "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver 'mydriver' outputdriver 'yourdriver' ";
+ "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT " +
+ "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver 'mydriver' outputdriver 'yourdriver' ";
assertEquals(0,hcatDriver.run(query).getResponseCode());
Table tbl = msc.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName);