You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/03/29 10:01:31 UTC
[1/3] storm git commit: STORM-1030. Hive Connector Fixes.
Repository: storm
Updated Branches:
refs/heads/master 4eaaa0bb1 -> e2ca82f8f
STORM-1030. Hive Connector Fixes.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5f0e91f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5f0e91f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5f0e91f
Branch: refs/heads/master
Commit: e5f0e91fcf7d377817982efd80f6e03d73dc371b
Parents: c2cf3be
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Nov 9 16:40:34 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Mar 28 13:05:52 2016 -0700
----------------------------------------------------------------------
.../org/apache/storm/hive/bolt/HiveBolt.java | 147 ++++++++------
.../apache/storm/hive/common/HiveOptions.java | 8 +-
.../org/apache/storm/hive/common/HiveUtils.java | 11 +-
.../apache/storm/hive/common/HiveWriter.java | 127 ++++++++-----
.../apache/storm/hive/trident/HiveState.java | 38 ++--
.../storm/hive/trident/HiveStateFactory.java | 1 +
.../apache/storm/hive/trident/HiveUpdater.java | 1 +
.../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++++++++++++++++
.../apache/storm/hive/bolt/HiveTopology.java | 6 +-
.../apache/storm/hive/bolt/TestHiveBolt.java | 11 +-
.../storm/hive/common/TestHiveWriter.java | 13 +-
.../storm/hive/trident/TridentHiveTopology.java | 2 +-
12 files changed, 415 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index 0646dcb..ef06e4b 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
@@ -56,14 +56,14 @@ public class HiveBolt extends BaseRichBolt {
private ExecutorService callTimeoutPool;
private transient Timer heartBeatTimer;
private Boolean kerberosEnabled = false;
- private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+ private AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
private UserGroupInformation ugi = null;
- HashMap<HiveEndPoint, HiveWriter> allWriters;
+ private Map<HiveEndPoint, HiveWriter> allWriters;
private List<Tuple> tupleBatch;
public HiveBolt(HiveOptions options) {
this.options = options;
- tupleBatch = new LinkedList<>();
+ tupleBatch = new LinkedList<Tuple>();
}
@Override
@@ -87,10 +87,12 @@ public class HiveBolt extends BaseRichBolt {
}
}
this.collector = collector;
- allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+ allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
String timeoutName = "hive-bolt-%d";
this.callTimeoutPool = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+ sendHeartBeat.set(true);
heartBeatTimer = new Timer();
setupHeartBeatTimer();
@@ -105,44 +107,37 @@ public class HiveBolt extends BaseRichBolt {
boolean forceFlush = false;
if (TupleUtils.isTick(tuple)) {
LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), options.getBatchSize());
- collector.ack(tuple);
forceFlush = true;
- }
- else {
+ } else {
List<String> partitionVals = options.getMapper().mapPartitions(tuple);
HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
HiveWriter writer = getOrCreateWriter(endPoint);
- if (timeToSendHeartBeat.compareAndSet(true, false)) {
- enableHeartBeatOnAllWriters();
- }
writer.write(options.getMapper().mapRecord(tuple));
tupleBatch.add(tuple);
if (tupleBatch.size() >= options.getBatchSize())
forceFlush = true;
}
+
if(forceFlush && !tupleBatch.isEmpty()) {
flushAllWriters(true);
LOG.info("acknowledging tuples after writers flushed ");
- for(Tuple t : tupleBatch)
+ for(Tuple t : tupleBatch) {
collector.ack(t);
+ }
tupleBatch.clear();
}
+ } catch(SerializationError se) {
+ LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple);
+ this.collector.reportError(se);
+ collector.ack(tuple);
} catch(Exception e) {
this.collector.reportError(e);
collector.fail(tuple);
- try {
- flushAndCloseWriters();
- LOG.info("acknowledging tuples after writers flushed and closed");
- for (Tuple t : tupleBatch)
- collector.ack(t);
- tupleBatch.clear();
- } catch (Exception e1) {
- //If flushAndClose fails assume tuples are lost, do not ack
- LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged");
- for (Tuple t : tupleBatch)
- collector.fail(t);
- tupleBatch.clear();
+ for (Tuple t : tupleBatch) {
+ collector.fail(t);
}
+ tupleBatch.clear();
+ abortAndCloseWriters();
}
}
@@ -153,13 +148,11 @@ public class HiveBolt extends BaseRichBolt {
@Override
public void cleanup() {
+ sendHeartBeat.set(false);
for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
try {
HiveWriter w = entry.getValue();
- LOG.info("Flushing writer to {}", w);
- w.flush(false);
- LOG.info("Closing writer to {}", w);
- w.close();
+ w.flushAndClose();
} catch (Exception ex) {
LOG.warn("Error while closing writer to " + entry.getKey() +
". Exception follows.", ex);
@@ -181,6 +174,7 @@ public class HiveBolt extends BaseRichBolt {
LOG.warn("shutdown interrupted on " + execService, ex);
}
}
+
callTimeoutPool = null;
super.cleanup();
LOG.info("Hive Bolt stopped");
@@ -188,8 +182,14 @@ public class HiveBolt extends BaseRichBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
- return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(),
- options.getTickTupleInterval());
+ Map<String, Object> conf = super.getComponentConfiguration();
+ if (conf == null)
+ conf = new Config();
+
+ if (options.getTickTupleInterval() > 0)
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval());
+
+ return conf;
}
private void setupHeartBeatTimer() {
@@ -197,13 +197,26 @@ public class HiveBolt extends BaseRichBolt {
heartBeatTimer.schedule(new TimerTask() {
@Override
public void run() {
- timeToSendHeartBeat.set(true);
- setupHeartBeatTimer();
+ try {
+ if (sendHeartBeat.get()) {
+ LOG.debug("Start sending heartbeat on all writers");
+ sendHeartBeatOnAllWriters();
+ setupHeartBeatTimer();
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to heartbeat on HiveWriter ", e);
+ }
}
}, options.getHeartBeatInterval() * 1000);
}
}
+ private void sendHeartBeatOnAllWriters() throws InterruptedException {
+ for (HiveWriter writer : allWriters.values()) {
+ writer.heartBeat();
+ }
+ }
+
void flushAllWriters(boolean rollToNext)
throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
for(HiveWriter writer: allWriters.values()) {
@@ -211,54 +224,60 @@ public class HiveBolt extends BaseRichBolt {
}
}
- /**
- * Closes all writers and remove them from cache
- * @return number of writers retired
- */
- private void closeAllWriters() {
+ void abortAndCloseWriters() {
try {
- //1) Retire writers
- for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
- entry.getValue().close();
- }
- //2) Clear cache
- allWriters.clear();
- } catch(Exception e) {
- LOG.warn("unable to close writers. ", e);
+ abortAllWriters();
+ closeAllWriters();
+ } catch(Exception ie) {
+ LOG.warn("unable to close hive connections. ", ie);
}
}
- void flushAndCloseWriters() throws Exception {
- try {
- flushAllWriters(false);
- } catch(Exception e) {
- LOG.warn("unable to flush hive writers. ", e);
- throw e;
- } finally {
- closeAllWriters();
+ /**
+ * Abort current Txn on all writers
+ */
+ private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
+ for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ try {
+ entry.getValue().abort();
+ } catch (Exception e) {
+ LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() +" due to exception ", e);
+ }
}
}
- private void enableHeartBeatOnAllWriters() {
- for (HiveWriter writer : allWriters.values()) {
- writer.setHeartBeatNeeded();
+ /**
+ * Closes all writers and remove them from cache
+ */
+ private void closeAllWriters() {
+ //1) Retire writers
+ for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch(Exception e) {
+ LOG.warn("unable to close writers. ", e);
+ }
}
+ //2) Clear cache
+ allWriters.clear();
}
private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
throws HiveWriter.ConnectFailure, InterruptedException {
try {
HiveWriter writer = allWriters.get( endPoint );
- if( writer == null ) {
+ if (writer == null) {
LOG.debug("Creating Writer to Hive end point : " + endPoint);
writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
- if(allWriters.size() > options.getMaxOpenConnections()){
+ if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
+ LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", allWriters.size(), options.getMaxOpenConnections());
int retired = retireIdleWriters();
if(retired==0) {
retireEldestWriter();
}
}
allWriters.put(endPoint, writer);
+ HiveUtils.logAllHiveEndPoints(allWriters);
}
return writer;
} catch (HiveWriter.ConnectFailure e) {
@@ -271,22 +290,25 @@ public class HiveBolt extends BaseRichBolt {
* Locate writer that has not been used for longest time and retire it
*/
private void retireEldestWriter() {
+ LOG.info("Attempting close eldest writers");
long oldestTimeStamp = System.currentTimeMillis();
HiveEndPoint eldest = null;
for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
- if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+ if (entry.getValue().getLastUsed() < oldestTimeStamp) {
eldest = entry.getKey();
oldestTimeStamp = entry.getValue().getLastUsed();
}
}
try {
LOG.info("Closing least used Writer to Hive end point : " + eldest);
- allWriters.remove(eldest).close();
+ allWriters.remove(eldest).flushAndClose();
} catch (IOException e) {
LOG.warn("Failed to close writer for end point: " + eldest, e);
} catch (InterruptedException e) {
LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
}
}
@@ -295,6 +317,7 @@ public class HiveBolt extends BaseRichBolt {
* @return number of writers retired
*/
private int retireIdleWriters() {
+ LOG.info("Attempting close idle writers");
int count = 0;
long now = System.currentTimeMillis();
ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
@@ -310,12 +333,14 @@ public class HiveBolt extends BaseRichBolt {
for(HiveEndPoint ep : retirees) {
try {
LOG.info("Closing idle Writer to Hive end point : {}", ep);
- allWriters.remove(ep).close();
+ allWriters.remove(ep).flushAndClose();
} catch (IOException e) {
LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
} catch (InterruptedException e) {
LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
}
}
return count;
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index 8c2f55d..ab81a75 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -36,11 +36,11 @@ public class HiveOptions implements Serializable {
protected String tableName;
protected String metaStoreURI;
protected Integer txnsPerBatch = 100;
- protected Integer maxOpenConnections = 500;
+ protected Integer maxOpenConnections = 10;
protected Integer batchSize = 15000;
- protected Integer idleTimeout = 0;
- protected Integer callTimeout = 10000;
- protected Integer heartBeatInterval = 240;
+ protected Integer idleTimeout = 60000;
+ protected Integer callTimeout = 0;
+ protected Integer heartBeatInterval = 60;
protected Boolean autoCreatePartitions = true;
protected String kerberosPrincipal;
protected String kerberosKeytab;
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
index 5483b07..591d565 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -25,12 +25,17 @@ import org.apache.hive.hcatalog.streaming.*;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.io.File;
import java.io.IOException;
public class HiveUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
if(partitionVals==null) {
@@ -72,5 +77,9 @@ public class HiveUtils {
}
}
-
+ public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) {
+ for (Map.Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ LOG.info("cached writers {} ", entry.getValue());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
index 233fec0..4df1c60 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -46,15 +46,15 @@ public class HiveWriter {
private final StreamingConnection connection;
private final int txnsPerBatch;
private final RecordWriter recordWriter;
- private TransactionBatch txnBatch;
private final ExecutorService callTimeoutPool;
private final long callTimeout;
-
+ private final Object txnBatchLock = new Object();
+ private TransactionBatch txnBatch;
private long lastUsed; // time of last flush on this writer
protected boolean closed; // flag indicating HiveWriter was closed
private boolean autoCreatePartitions;
- private boolean heartBeatNeeded = false;
private UserGroupInformation ugi;
+ private int totalRecords = 0;
public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
boolean autoCreatePartitions, long callTimeout,
@@ -83,11 +83,9 @@ public class HiveWriter {
@Override
public String toString() {
- return endPoint.toString();
- }
-
- public void setHeartBeatNeeded() {
- heartBeatNeeded = true;
+ return "{ "
+ + "endPoint = " + endPoint.toString()
+ + ", TransactionBatch = " + txnBatch.toString() + " }";
}
/**
@@ -97,7 +95,7 @@ public class HiveWriter {
* @throws InterruptedException
*/
public synchronized void write(final byte[] record)
- throws WriteFailure, InterruptedException {
+ throws WriteFailure, SerializationError, InterruptedException {
if (closed) {
throw new IllegalStateException("This hive streaming writer was closed " +
"and thus no longer able to write : " + endPoint);
@@ -109,9 +107,12 @@ public class HiveWriter {
@Override
public Void call() throws StreamingException, InterruptedException {
txnBatch.write(record);
+ totalRecords++;
return null;
}
});
+ } catch(SerializationError se) {
+ throw new SerializationError(endPoint.toString() + " SerializationError", se);
} catch(StreamingException e) {
throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
} catch(TimeoutException e) {
@@ -120,29 +121,20 @@ public class HiveWriter {
}
/**
- * Commits the current Txn.
+ * Commits the current Txn if totalRecordsPerTransaction > 0 .
* If 'rollToNext' is true, will switch to next Txn in batch or to a
* new TxnBatch if current Txn batch is exhausted
- * TODO: see what to do when there are errors in each IO call stage
*/
public void flush(boolean rollToNext)
throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
- if(heartBeatNeeded) {
- heartBeatNeeded = false;
- heartBeat();
- }
- lastUsed = System.currentTimeMillis();
+ // if there are no records do not call flush
+ if (totalRecords <= 0) return;
try {
- commitTxn();
- if(txnBatch.remainingTransactions() == 0) {
- closeTxnBatch();
- txnBatch = null;
- if(rollToNext) {
- txnBatch = nextTxnBatch(recordWriter);
- }
- } else if(rollToNext) {
- LOG.debug("Switching to next Txn for {}", endPoint);
- txnBatch.beginNextTransaction(); // does not block
+ synchronized(txnBatchLock) {
+ commitTxn();
+ nextTxn(rollToNext);
+ totalRecords = 0;
+ lastUsed = System.currentTimeMillis();
}
} catch(StreamingException e) {
throw new TxnFailure(txnBatch, e);
@@ -154,28 +146,46 @@ public class HiveWriter {
*/
public void heartBeat() throws InterruptedException {
// 1) schedule the heartbeat on one thread in pool
- try {
- callWithTimeout(new CallRunner<Void>() {
- @Override
+ synchronized(txnBatchLock) {
+ try {
+ callWithTimeout(new CallRunner<Void>() {
+ @Override
public Void call() throws Exception {
- try {
- LOG.debug("Sending heartbeat on batch " + txnBatch);
- txnBatch.heartbeat();
- } catch (StreamingException e) {
- LOG.warn("Heartbeat error on batch " + txnBatch, e);
+ try {
+ LOG.info("Sending heartbeat on batch " + txnBatch);
+ txnBatch.heartbeat();
+ } catch (StreamingException e) {
+ LOG.warn("Heartbeat error on batch " + txnBatch, e);
+ }
+ return null;
}
- return null;
- }
- });
- } catch (InterruptedException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
- // Suppressing exceptions as we don't care for errors on heartbeats
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+ // Suppressing exceptions as we don't care for errors on heartbeats
+ }
}
}
/**
+ * returns totalRecords written so far in a transaction
+ * @returns totalRecords
+ */
+ public int getTotalRecords() {
+ return totalRecords;
+ }
+
+ /**
+ * Flush and Close current transactionBatch.
+ */
+ public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure,
+ IOException, InterruptedException {
+ flush(false);
+ close();
+ }
+ /**
* Close the Transaction Batch and connection
* @throws IOException
* @throws InterruptedException
@@ -246,8 +256,8 @@ public class HiveWriter {
return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
}
});
- batch.beginNextTransaction();
- LOG.debug("Acquired {}. Switching to first txn", batch);
+ batch.beginNextTransaction();
+ LOG.debug("Acquired {}. Switching to first txn", batch);
} catch(TimeoutException e) {
throw new TxnBatchFailure(endPoint, e);
} catch(StreamingException e) {
@@ -279,10 +289,17 @@ public class HiveWriter {
* Aborts the current Txn and switches to next Txn.
* @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
*/
- public void abort() throws InterruptedException {
- abortTxn();
+ public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
+ synchronized(txnBatchLock) {
+ abortTxn();
+ nextTxn(true); // roll to next
+ }
}
+
+ /**
+ * Aborts current Txn in the txnBatch.
+ */
private void abortTxn() throws InterruptedException {
LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
try {
@@ -305,6 +322,24 @@ public class HiveWriter {
/**
+ * if there are remainingTransactions in current txnBatch, begins nextTransactions
+ * otherwise creates new txnBatch.
+ * @param boolean rollToNext
+ */
+ private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
+ if(txnBatch.remainingTransactions() == 0) {
+ closeTxnBatch();
+ txnBatch = null;
+ if(rollToNext) {
+ txnBatch = nextTxnBatch(recordWriter);
+ }
+ } else if(rollToNext) {
+ LOG.debug("Switching to next Txn for {}", endPoint);
+ txnBatch.beginNextTransaction(); // does not block
+ }
+ }
+
+ /**
* If the current thread has been interrupted, then throws an
* exception.
* @throws InterruptedException
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index dd296e4..08a5953 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -15,6 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+
package org.apache.storm.hive.trident;
import org.apache.storm.trident.operation.TridentCollector;
@@ -38,7 +40,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
@@ -55,9 +57,10 @@ public class HiveState implements State {
private ExecutorService callTimeoutPool;
private transient Timer heartBeatTimer;
private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+ private Boolean sendHeartBeat = true;
private UserGroupInformation ugi = null;
private Boolean kerberosEnabled = false;
- HashMap<HiveEndPoint, HiveWriter> allWriters;
+ private Map<HiveEndPoint, HiveWriter> allWriters;
public HiveState(HiveOptions options) {
this.options = options;
@@ -93,7 +96,7 @@ public class HiveState implements State {
}
}
- allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+ allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
String timeoutName = "hive-bolt-%d";
this.callTimeoutPool = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
@@ -116,9 +119,6 @@ public class HiveState implements State {
private void writeTuples(List<TridentTuple> tuples)
throws Exception {
- if(timeToSendHeartBeat.compareAndSet(true, false)) {
- enableHeartBeatOnAllWriters();
- }
for (TridentTuple tuple : tuples) {
List<String> partitionVals = options.getMapper().mapPartitions(tuple);
HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
@@ -134,20 +134,18 @@ public class HiveState implements State {
private void abortAndCloseWriters() {
try {
+ sendHeartBeat = false;
abortAllWriters();
closeAllWriters();
- } catch(InterruptedException e) {
- LOG.warn("unable to close hive connections. ", e);
- } catch(IOException ie) {
+ } catch(Exception ie) {
LOG.warn("unable to close hive connections. ", ie);
}
}
/**
* Abort current Txn on all writers
- * @return number of writers retired
*/
- private void abortAllWriters() throws InterruptedException {
+ private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
entry.getValue().abort();
}
@@ -172,8 +170,15 @@ public class HiveState implements State {
heartBeatTimer.schedule(new TimerTask() {
@Override
public void run() {
- timeToSendHeartBeat.set(true);
- setupHeartBeatTimer();
+ try {
+ if (sendHeartBeat) {
+ LOG.debug("Start sending heartbeat on all writers");
+ sendHeartBeatOnAllWriters();
+ setupHeartBeatTimer();
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to heartbeat on HiveWriter ", e);
+ }
}
}, options.getHeartBeatInterval() * 1000);
}
@@ -186,9 +191,9 @@ public class HiveState implements State {
}
}
- private void enableHeartBeatOnAllWriters() {
+ private void sendHeartBeatOnAllWriters() throws InterruptedException {
for (HiveWriter writer : allWriters.values()) {
- writer.setHeartBeatNeeded();
+ writer.heartBeat();
}
}
@@ -199,7 +204,7 @@ public class HiveState implements State {
if( writer == null ) {
LOG.info("Creating Writer to Hive end point : " + endPoint);
writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
- if(allWriters.size() > options.getMaxOpenConnections()){
+ if(allWriters.size() > (options.getMaxOpenConnections() - 1)){
int retired = retireIdleWriters();
if(retired==0) {
retireEldestWriter();
@@ -274,6 +279,7 @@ public class HiveState implements State {
public void cleanup() {
for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
try {
+ sendHeartBeat = false;
HiveWriter w = entry.getValue();
LOG.info("Flushing writer to {}", w);
w.flush(false);
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
index 7e0e1f2..d6e3c71 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.hive.trident;
import org.apache.storm.task.IMetricsContext;
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
index 82cfc15..062f7fb 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.hive.trident;
import org.apache.storm.trident.operation.TridentCollector;
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
new file mode 100644
index 0000000..607bd61
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MockTupleHelpers;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class BucketTestHiveTopology {
+ static final String USER_SPOUT_ID = "user-spout";
+ static final String BOLT_ID = "my-hive-bolt";
+ static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+ public static void main(String[] args) throws Exception {
+ if ((args == null) || (args.length < 7)) {
+ System.out.println("Usage: BucketTestHiveTopology metastoreURI "
+ + "dbName tableName dataFileLocation hiveBatchSize " +
+ "hiveTickTupl]eIntervalSecs workers [topologyNamey] [keytab file]"
+ + " [principal name] ");
+ System.exit(1);
+ }
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ String sourceFileLocation = args[3];
+ Integer hiveBatchSize = Integer.parseInt(args[4]);
+ Integer hiveTickTupleIntervalSecs = Integer.parseInt(args[5]);
+ Integer workers = Integer.parseInt(args[6]);
+ String[] colNames = { "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk",
+ "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk",
+ "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+ "ss_wholesale_cost", "ss_list_price", "ss_sales_price",
+ "ss_ext_discount_amt", "ss_ext_sales_price",
+ "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
+ "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
+ "ss_net_profit" };
+ Config config = new Config();
+ config.setNumWorkers(workers);
+ UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation);
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames)).withTimeAsPartitionField("yyyy/MM/dd");
+ HiveOptions hiveOptions;
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(hiveBatchSize);
+ // doing below because its affecting storm metrics most likely
+ // had to make tick tuple a mandatory argument since its positional
+ if (hiveTickTupleIntervalSecs > 0) {
+ hiveOptions.withTickTupleInterval(hiveTickTupleIntervalSecs);
+ }
+ if (args.length == 10) {
+ hiveOptions.withKerberosKeytab(args[8]).withKerberosPrincipal(args[9]);
+ }
+ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(USER_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, hiveBolt, 14)
+ .shuffleGrouping(USER_SPOUT_ID);
+ if (args.length == 6) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else {
+ StormSubmitter.submitTopology(args[7], config, builder.createTopology());
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String filePath;
+ private BufferedReader br;
+ private int count = 0;
+ private long total = 0L;
+ private String[] outputFields = { "ss_sold_date_sk", "ss_sold_time_sk",
+ "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk",
+ "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number",
+ "ss_quantity", "ss_wholesale_cost", "ss_list_price",
+ "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price",
+ "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
+ "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
+ "ss_net_profit" };
+
+ public UserDataSpout withDataFile (String filePath) {
+ this.filePath = filePath;
+ return this;
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(this.outputFields));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ try {
+ this.br = new BufferedReader(new FileReader(new File(this
+ .filePath)));
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public void nextTuple() {
+ String line;
+ try {
+ if ((line = br.readLine()) != null) {
+ System.out.println("*********" + line);
+ String[] values = line.split("\\|", -1);
+ // above gives an extra empty string at the end. below
+ // removes that
+ values = Arrays.copyOfRange(values, 0,
+ this.outputFields.length);
+ Values tupleValues = new Values(values);
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, tupleValues);
+ this.collector.emit(tupleValues, msgId);
+ count++;
+ total++;
+ if (count > 1000) {
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
index 1132587..8b61d5e 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -51,7 +51,8 @@ public class HiveTopology {
config.setNumWorkers(1);
UserDataSpout spout = new UserDataSpout();
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames));
+ .withTimeAsPartitionField("yyyy/MM/dd/hh")
+ .withColumnFields(new Fields(colNames));
HiveOptions hiveOptions;
if (args.length == 6) {
hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
@@ -64,7 +65,8 @@ public class HiveTopology {
hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
.withTxnsPerBatch(10)
.withBatchSize(100)
- .withIdleTimeout(10);
+ .withIdleTimeout(10)
+ .withMaxOpenConnections(1);
}
HiveBolt hiveBolt = new HiveBolt(hiveOptions);
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index ead2c8d..0cf0084 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -204,7 +204,7 @@ public class TestHiveBolt {
@Test
public void testWithTimeformat()
throws Exception {
- String[] partNames1 = {"date"};
+ String[] partNames1 = {"dt"};
String timeFormat = "yyyy/MM/dd";
HiveSetupUtil.dropDB(conf,dbName1);
HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1, null,
@@ -213,8 +213,9 @@ public class TestHiveBolt {
.withColumnFields(new Fields(colNames))
.withTimeAsPartitionField(timeFormat);
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
- .withTxnsPerBatch(2)
- .withBatchSize(1);
+ .withTxnsPerBatch(2)
+ .withBatchSize(1)
+ .withMaxOpenConnections(1);
bolt = new HiveBolt(hiveOptions);
bolt.prepare(config,null,collector);
Integer id = 100;
@@ -319,7 +320,7 @@ public class TestHiveBolt {
//This forces a failure of all the flush attempts
doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
- doThrow(new Exception()).when(spyBolt).flushAndCloseWriters();
+
spyBolt.prepare(config, null, new OutputCollector(collector));
@@ -383,7 +384,7 @@ public class TestHiveBolt {
//The tick should NOT cause any acks since the batch was empty except for acking itself
Tuple mockTick = MockTupleHelpers.mockTickTuple();
bolt.execute(mockTick);
- verify(collector).ack(mockTick);
+ verifyZeroInteractions(collector);
bolt.cleanup();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
index 952b0fb..e43fec6 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
import org.apache.storm.hive.bolt.mapper.HiveMapper;
import org.apache.storm.hive.bolt.HiveSetupUtil;
@@ -140,18 +141,22 @@ public class TestHiveWriter {
, callTimeoutPool, mapper, ugi);
Tuple tuple = generateTestTuple("1","abc");
writer.write(mapper.mapRecord(tuple));
+ tuple = generateTestTuple("2","def");
+ writer.write(mapper.mapRecord(tuple));
+ Assert.assertEquals(writer.getTotalRecords(), 2);
checkRecordCountInTable(dbName,tblName,0);
writer.flush(true);
+ Assert.assertEquals(writer.getTotalRecords(), 0);
- tuple = generateTestTuple("2","def");
+ tuple = generateTestTuple("3","ghi");
writer.write(mapper.mapRecord(tuple));
writer.flush(true);
- tuple = generateTestTuple("3","ghi");
+ tuple = generateTestTuple("4","klm");
writer.write(mapper.mapRecord(tuple));
writer.flush(true);
writer.close();
- checkRecordCountInTable(dbName,tblName,3);
+ checkRecordCountInTable(dbName,tblName,4);
}
private Tuple generateTestTuple(Object id, Object msg) {
@@ -167,7 +172,7 @@ public class TestHiveWriter {
}
private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
- throws HiveWriter.WriteFailure, InterruptedException {
+ throws HiveWriter.WriteFailure, InterruptedException, SerializationError {
Integer id = 100;
String msg = "test-123";
for (int i = 1; i <= count; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
index 10921bc..d6c1d65 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -179,7 +179,7 @@ public class TridentHiveTopology {
}
@Override
- public Map<String, Object> getComponentConfiguration() {
+ public Map getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
[3/3] storm git commit: Merge branch 'STORM-1030'
Posted by ka...@apache.org.
Merge branch 'STORM-1030'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e2ca82f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e2ca82f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e2ca82f8
Branch: refs/heads/master
Commit: e2ca82f8f55ecdaeac31d3da6c1fc634d9bc4b25
Parents: 4eaaa0b 2eb80a0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Mar 29 17:01:22 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Mar 29 17:01:22 2016 +0900
----------------------------------------------------------------------
.../org/apache/storm/hive/bolt/HiveBolt.java | 147 ++++++++------
.../apache/storm/hive/common/HiveOptions.java | 8 +-
.../org/apache/storm/hive/common/HiveUtils.java | 11 +-
.../apache/storm/hive/common/HiveWriter.java | 127 ++++++++-----
.../apache/storm/hive/trident/HiveState.java | 38 ++--
.../storm/hive/trident/HiveStateFactory.java | 1 +
.../apache/storm/hive/trident/HiveUpdater.java | 1 +
.../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++++++++++++++++
.../apache/storm/hive/bolt/HiveTopology.java | 6 +-
.../apache/storm/hive/bolt/TestHiveBolt.java | 11 +-
.../storm/hive/common/TestHiveWriter.java | 13 +-
.../storm/hive/trident/TridentHiveTopology.java | 2 +-
12 files changed, 415 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
[2/3] storm git commit: Merge branch 'STORM-1030-V1' of
https://github.com/harshach/incubator-storm into STORM-1030
Posted by ka...@apache.org.
Merge branch 'STORM-1030-V1' of https://github.com/harshach/incubator-storm into STORM-1030
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2eb80a0f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2eb80a0f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2eb80a0f
Branch: refs/heads/master
Commit: 2eb80a0fd783a1bec3192802bf8e01d6ff152d74
Parents: da7969e e5f0e91
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Mar 29 16:41:29 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Mar 29 16:41:29 2016 +0900
----------------------------------------------------------------------
.../org/apache/storm/hive/bolt/HiveBolt.java | 147 ++++++++------
.../apache/storm/hive/common/HiveOptions.java | 8 +-
.../org/apache/storm/hive/common/HiveUtils.java | 11 +-
.../apache/storm/hive/common/HiveWriter.java | 127 ++++++++-----
.../apache/storm/hive/trident/HiveState.java | 38 ++--
.../storm/hive/trident/HiveStateFactory.java | 1 +
.../apache/storm/hive/trident/HiveUpdater.java | 1 +
.../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++++++++++++++++
.../apache/storm/hive/bolt/HiveTopology.java | 6 +-
.../apache/storm/hive/bolt/TestHiveBolt.java | 11 +-
.../storm/hive/common/TestHiveWriter.java | 13 +-
.../storm/hive/trident/TridentHiveTopology.java | 2 +-
12 files changed, 415 insertions(+), 140 deletions(-)
----------------------------------------------------------------------