You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/03/29 10:00:25 UTC

[1/2] storm git commit: STORM-1030. Hive Connector Fixes.

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 7edd00a36 -> 1cfaf1245


STORM-1030. Hive Connector Fixes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a276616b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a276616b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a276616b

Branch: refs/heads/1.x-branch
Commit: a276616bedbbee28ca318059853ae0e94d06408b
Parents: 7edd00a
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Nov 9 16:40:34 2015 -0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Mar 29 16:43:09 2016 +0900

----------------------------------------------------------------------
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 147 ++++++++------
 .../apache/storm/hive/common/HiveOptions.java   |   8 +-
 .../org/apache/storm/hive/common/HiveUtils.java |  11 +-
 .../apache/storm/hive/common/HiveWriter.java    | 127 ++++++++-----
 .../apache/storm/hive/trident/HiveState.java    |  38 ++--
 .../storm/hive/trident/HiveStateFactory.java    |   1 +
 .../apache/storm/hive/trident/HiveUpdater.java  |   1 +
 .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++++++++++++++++
 .../apache/storm/hive/bolt/HiveTopology.java    |   6 +-
 .../apache/storm/hive/bolt/TestHiveBolt.java    |  11 +-
 .../storm/hive/common/TestHiveWriter.java       |  13 +-
 .../storm/hive/trident/TridentHiveTopology.java |   2 +-
 12 files changed, 415 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index 0646dcb..ef06e4b 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Map.Entry;
@@ -56,14 +56,14 @@ public class HiveBolt extends  BaseRichBolt {
     private ExecutorService callTimeoutPool;
     private transient Timer heartBeatTimer;
     private Boolean kerberosEnabled = false;
-    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
     private UserGroupInformation ugi = null;
-    HashMap<HiveEndPoint, HiveWriter> allWriters;
+    private Map<HiveEndPoint, HiveWriter> allWriters;
     private List<Tuple> tupleBatch;
 
     public HiveBolt(HiveOptions options) {
         this.options = options;
-        tupleBatch = new LinkedList<>();
+        tupleBatch = new LinkedList<Tuple>();
     }
 
     @Override
@@ -87,10 +87,12 @@ public class HiveBolt extends  BaseRichBolt {
                 }
             }
             this.collector = collector;
-            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
             String timeoutName = "hive-bolt-%d";
             this.callTimeoutPool = Executors.newFixedThreadPool(1,
                                 new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+            sendHeartBeat.set(true);
             heartBeatTimer = new Timer();
             setupHeartBeatTimer();
 
@@ -105,44 +107,37 @@ public class HiveBolt extends  BaseRichBolt {
             boolean forceFlush = false;
             if (TupleUtils.isTick(tuple)) {
                 LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), options.getBatchSize());
-                collector.ack(tuple);
                 forceFlush = true;
-            }
-            else {
+            } else {
                 List<String> partitionVals = options.getMapper().mapPartitions(tuple);
                 HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
                 HiveWriter writer = getOrCreateWriter(endPoint);
-                if (timeToSendHeartBeat.compareAndSet(true, false)) {
-                    enableHeartBeatOnAllWriters();
-                }
                 writer.write(options.getMapper().mapRecord(tuple));
                 tupleBatch.add(tuple);
                 if (tupleBatch.size() >= options.getBatchSize())
                     forceFlush = true;
             }
+
             if(forceFlush && !tupleBatch.isEmpty()) {
                 flushAllWriters(true);
                 LOG.info("acknowledging tuples after writers flushed ");
-                for(Tuple t : tupleBatch)
+                for(Tuple t : tupleBatch) {
                     collector.ack(t);
+                }
                 tupleBatch.clear();
             }
+        } catch(SerializationError se) {
+            LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple);
+            this.collector.reportError(se);
+            collector.ack(tuple);
         } catch(Exception e) {
             this.collector.reportError(e);
             collector.fail(tuple);
-            try {
-                flushAndCloseWriters();
-                LOG.info("acknowledging tuples after writers flushed and closed");
-                for (Tuple t : tupleBatch)
-                    collector.ack(t);
-                tupleBatch.clear();
-            } catch (Exception e1) {
-                //If flushAndClose fails assume tuples are lost, do not ack
-                LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged");
-                for (Tuple t : tupleBatch)
-                    collector.fail(t);
-                tupleBatch.clear();
+            for (Tuple t : tupleBatch) {
+                collector.fail(t);
             }
+            tupleBatch.clear();
+            abortAndCloseWriters();
         }
     }
 
@@ -153,13 +148,11 @@ public class HiveBolt extends  BaseRichBolt {
 
     @Override
     public void cleanup() {
+        sendHeartBeat.set(false);
         for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             try {
                 HiveWriter w = entry.getValue();
-                LOG.info("Flushing writer to {}", w);
-                w.flush(false);
-                LOG.info("Closing writer to {}", w);
-                w.close();
+                w.flushAndClose();
             } catch (Exception ex) {
                 LOG.warn("Error while closing writer to " + entry.getKey() +
                          ". Exception follows.", ex);
@@ -181,6 +174,7 @@ public class HiveBolt extends  BaseRichBolt {
                 LOG.warn("shutdown interrupted on " + execService, ex);
             }
         }
+
         callTimeoutPool = null;
         super.cleanup();
         LOG.info("Hive Bolt stopped");
@@ -188,8 +182,14 @@ public class HiveBolt extends  BaseRichBolt {
 
     @Override
     public Map<String, Object> getComponentConfiguration() {
-        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(),
-                options.getTickTupleInterval());
+        Map<String, Object> conf = super.getComponentConfiguration();
+        if (conf == null)
+            conf = new Config();
+
+        if (options.getTickTupleInterval() > 0)
+            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval());
+
+        return conf;
     }
 
     private void setupHeartBeatTimer() {
@@ -197,13 +197,26 @@ public class HiveBolt extends  BaseRichBolt {
             heartBeatTimer.schedule(new TimerTask() {
                     @Override
                     public void run() {
-                        timeToSendHeartBeat.set(true);
-                        setupHeartBeatTimer();
+                        try {
+                            if (sendHeartBeat.get()) {
+                                LOG.debug("Start sending heartbeat on all writers");
+                                sendHeartBeatOnAllWriters();
+                                setupHeartBeatTimer();
+                            }
+                        } catch (Exception e) {
+                            LOG.warn("Failed to heartbeat on HiveWriter ", e);
+                        }
                     }
                 }, options.getHeartBeatInterval() * 1000);
         }
     }
 
+    private void sendHeartBeatOnAllWriters() throws InterruptedException {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.heartBeat();
+        }
+    }
+
     void flushAllWriters(boolean rollToNext)
         throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
         for(HiveWriter writer: allWriters.values()) {
@@ -211,54 +224,60 @@ public class HiveBolt extends  BaseRichBolt {
         }
     }
 
-    /**
-     * Closes all writers and remove them from cache
-     * @return number of writers retired
-     */
-    private void closeAllWriters() {
+    void abortAndCloseWriters() {
         try {
-            //1) Retire writers
-            for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-                entry.getValue().close();
-            }
-            //2) Clear cache
-            allWriters.clear();
-        } catch(Exception e) {
-            LOG.warn("unable to close writers. ", e);
+            abortAllWriters();
+            closeAllWriters();
+        }  catch(Exception ie) {
+            LOG.warn("unable to close hive connections. ", ie);
         }
     }
 
-    void flushAndCloseWriters() throws Exception {
-        try {
-            flushAllWriters(false);
-        } catch(Exception e) {
-            LOG.warn("unable to flush hive writers. ", e);
-            throw e;
-        } finally {
-            closeAllWriters();
+    /**
+     * Abort current Txn on all writers
+     */
+    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                entry.getValue().abort();
+            } catch (Exception e) {
+                LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() +" due to exception ", e);
+            }
         }
     }
 
-    private void enableHeartBeatOnAllWriters() {
-        for (HiveWriter writer : allWriters.values()) {
-            writer.setHeartBeatNeeded();
+    /**
+     * Closes all writers and remove them from cache
+     */
+    private void closeAllWriters() {
+        //1) Retire writers
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch(Exception e) {
+                LOG.warn("unable to close writers. ", e);
+            }
         }
+        //2) Clear cache
+        allWriters.clear();
     }
 
     private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
         throws HiveWriter.ConnectFailure, InterruptedException {
         try {
             HiveWriter writer = allWriters.get( endPoint );
-            if( writer == null ) {
+            if (writer == null) {
                 LOG.debug("Creating Writer to Hive end point : " + endPoint);
                 writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
-                if(allWriters.size() > options.getMaxOpenConnections()){
+                if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
+                    LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", allWriters.size(), options.getMaxOpenConnections());
                     int retired = retireIdleWriters();
                     if(retired==0) {
                         retireEldestWriter();
                     }
                 }
                 allWriters.put(endPoint, writer);
+                HiveUtils.logAllHiveEndPoints(allWriters);
             }
             return writer;
         } catch (HiveWriter.ConnectFailure e) {
@@ -271,22 +290,25 @@ public class HiveBolt extends  BaseRichBolt {
      * Locate writer that has not been used for longest time and retire it
      */
     private void retireEldestWriter() {
+        LOG.info("Attempting close eldest writers");
         long oldestTimeStamp = System.currentTimeMillis();
         HiveEndPoint eldest = null;
         for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+            if (entry.getValue().getLastUsed() < oldestTimeStamp) {
                 eldest = entry.getKey();
                 oldestTimeStamp = entry.getValue().getLastUsed();
             }
         }
         try {
             LOG.info("Closing least used Writer to Hive end point : " + eldest);
-            allWriters.remove(eldest).close();
+            allWriters.remove(eldest).flushAndClose();
         } catch (IOException e) {
             LOG.warn("Failed to close writer for end point: " + eldest, e);
         } catch (InterruptedException e) {
             LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
             Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
         }
     }
 
@@ -295,6 +317,7 @@ public class HiveBolt extends  BaseRichBolt {
      * @return number of writers retired
      */
     private int retireIdleWriters() {
+        LOG.info("Attempting close idle writers");
         int count = 0;
         long now = System.currentTimeMillis();
         ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
@@ -310,12 +333,14 @@ public class HiveBolt extends  BaseRichBolt {
         for(HiveEndPoint ep : retirees) {
             try {
                 LOG.info("Closing idle Writer to Hive end point : {}", ep);
-                allWriters.remove(ep).close();
+                allWriters.remove(ep).flushAndClose();
             } catch (IOException e) {
                 LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
                 Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
             }
         }
         return count;

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index 8c2f55d..ab81a75 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -36,11 +36,11 @@ public class HiveOptions implements Serializable {
     protected String tableName;
     protected String metaStoreURI;
     protected Integer txnsPerBatch = 100;
-    protected Integer maxOpenConnections = 500;
+    protected Integer maxOpenConnections = 10;
     protected Integer batchSize = 15000;
-    protected Integer idleTimeout = 0;
-    protected Integer callTimeout = 10000;
-    protected Integer heartBeatInterval = 240;
+    protected Integer idleTimeout = 60000;
+    protected Integer callTimeout = 0;
+    protected Integer heartBeatInterval = 60;
     protected Boolean autoCreatePartitions = true;
     protected String kerberosPrincipal;
     protected String kerberosKeytab;

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
index 5483b07..591d565 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -25,12 +25,17 @@ import org.apache.hive.hcatalog.streaming.*;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.io.File;
 import java.io.IOException;
 
 public class HiveUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
 
     public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
         if(partitionVals==null) {
@@ -72,5 +77,9 @@ public class HiveUtils {
          }
      }
 
-
+    public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) {
+        for (Map.Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            LOG.info("cached writers {} ", entry.getValue());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
index 233fec0..4df1c60 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -46,15 +46,15 @@ public class HiveWriter {
     private final StreamingConnection connection;
     private final int txnsPerBatch;
     private final RecordWriter recordWriter;
-    private TransactionBatch txnBatch;
     private final ExecutorService callTimeoutPool;
     private final long callTimeout;
-
+    private final Object txnBatchLock = new Object();
+    private TransactionBatch txnBatch;
     private long lastUsed; // time of last flush on this writer
     protected boolean closed; // flag indicating HiveWriter was closed
     private boolean autoCreatePartitions;
-    private boolean heartBeatNeeded = false;
     private UserGroupInformation ugi;
+    private int totalRecords = 0;
 
     public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
                       boolean autoCreatePartitions, long callTimeout,
@@ -83,11 +83,9 @@ public class HiveWriter {
 
     @Override
     public String toString() {
-        return endPoint.toString();
-    }
-
-    public void setHeartBeatNeeded() {
-        heartBeatNeeded = true;
+          return "{ "
+              + "endPoint = " + endPoint.toString()
+              + ", TransactionBatch = " + txnBatch.toString() + " }";
     }
 
     /**
@@ -97,7 +95,7 @@ public class HiveWriter {
      * @throws InterruptedException
      */
     public synchronized void write(final byte[] record)
-        throws WriteFailure, InterruptedException {
+        throws WriteFailure, SerializationError, InterruptedException {
         if (closed) {
             throw new IllegalStateException("This hive streaming writer was closed " +
                                             "and thus no longer able to write : " + endPoint);
@@ -109,9 +107,12 @@ public class HiveWriter {
                     @Override
                     public Void call() throws StreamingException, InterruptedException {
                         txnBatch.write(record);
+                        totalRecords++;
                         return null;
                     }
                 });
+        } catch(SerializationError se) {
+            throw new SerializationError(endPoint.toString() + " SerializationError", se);
         } catch(StreamingException e) {
             throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
         } catch(TimeoutException e) {
@@ -120,29 +121,20 @@ public class HiveWriter {
     }
 
     /**
-     * Commits the current Txn.
+     * Commits the current Txn if totalRecordsPerTransaction > 0 .
      * If 'rollToNext' is true, will switch to next Txn in batch or to a
      *       new TxnBatch if current Txn batch is exhausted
-     * TODO: see what to do when there are errors in each IO call stage
      */
     public void flush(boolean rollToNext)
         throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
-        if(heartBeatNeeded) {
-            heartBeatNeeded = false;
-            heartBeat();
-        }
-        lastUsed = System.currentTimeMillis();
+        // if there are no records do not call flush
+        if (totalRecords <= 0) return;
         try {
-            commitTxn();
-            if(txnBatch.remainingTransactions() == 0) {
-                closeTxnBatch();
-                txnBatch = null;
-                if(rollToNext) {
-                    txnBatch = nextTxnBatch(recordWriter);
-                }
-            } else if(rollToNext) {
-                LOG.debug("Switching to next Txn for {}", endPoint);
-                txnBatch.beginNextTransaction(); // does not block
+            synchronized(txnBatchLock) {
+                commitTxn();
+                nextTxn(rollToNext);
+                totalRecords = 0;
+                lastUsed = System.currentTimeMillis();
             }
         } catch(StreamingException e) {
             throw new TxnFailure(txnBatch, e);
@@ -154,28 +146,46 @@ public class HiveWriter {
      */
     public void heartBeat() throws InterruptedException {
         // 1) schedule the heartbeat on one thread in pool
-        try {
-            callWithTimeout(new CallRunner<Void>() {
-                    @Override
+        synchronized(txnBatchLock) {
+            try {
+                callWithTimeout(new CallRunner<Void>() {
+                        @Override
                         public Void call() throws Exception {
-                        try {
-                            LOG.debug("Sending heartbeat on batch " + txnBatch);
-                            txnBatch.heartbeat();
-                        } catch (StreamingException e) {
-                            LOG.warn("Heartbeat error on batch " + txnBatch, e);
+                            try {
+                                LOG.info("Sending heartbeat on batch " + txnBatch);
+                                txnBatch.heartbeat();
+                            } catch (StreamingException e) {
+                                LOG.warn("Heartbeat error on batch " + txnBatch, e);
+                            }
+                            return null;
                         }
-                        return null;
-                    }
-                });
-        } catch (InterruptedException e) {
-            throw e;
-        } catch (Exception e) {
-            LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
-            // Suppressing exceptions as we don't care for errors on heartbeats
+                    });
+            } catch (InterruptedException e) {
+                throw e;
+            } catch (Exception e) {
+                LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch,  e);
+                // Suppressing exceptions as we don't care for errors on heartbeats
+            }
         }
     }
 
     /**
+     * returns totalRecords written so far in a transaction
+     * @returns totalRecords
+     */
+    public int getTotalRecords() {
+        return totalRecords;
+    }
+
+    /**
+     * Flush and Close current transactionBatch.
+     */
+    public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure,
+            IOException, InterruptedException {
+        flush(false);
+        close();
+    }
+    /**
      * Close the Transaction Batch and connection
      * @throws IOException
      * @throws InterruptedException
@@ -246,8 +256,8 @@ public class HiveWriter {
                     return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
                 }
             });
-        batch.beginNextTransaction();
-        LOG.debug("Acquired {}. Switching to first txn", batch);
+            batch.beginNextTransaction();
+            LOG.debug("Acquired {}. Switching to first txn", batch);
         } catch(TimeoutException e) {
             throw new TxnBatchFailure(endPoint, e);
         } catch(StreamingException e) {
@@ -279,10 +289,17 @@ public class HiveWriter {
      * Aborts the current Txn and switches to next Txn.
      * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
      */
-    public void abort() throws InterruptedException {
-        abortTxn();
+    public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
+        synchronized(txnBatchLock) {
+            abortTxn();
+            nextTxn(true); // roll to next
+        }
     }
 
+
+    /**
+     * Aborts current Txn in the txnBatch.
+     */
     private void abortTxn() throws InterruptedException {
         LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
         try {
@@ -305,6 +322,24 @@ public class HiveWriter {
 
 
     /**
+     * if there are remainingTransactions in current txnBatch, begins nextTransactions
+     * otherwise creates new txnBatch.
+     * @param boolean rollToNext
+     */
+    private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
+        if(txnBatch.remainingTransactions() == 0) {
+            closeTxnBatch();
+            txnBatch = null;
+            if(rollToNext) {
+                txnBatch = nextTxnBatch(recordWriter);
+            }
+        } else if(rollToNext) {
+            LOG.debug("Switching to next Txn for {}", endPoint);
+            txnBatch.beginNextTransaction(); // does not block
+        }
+    }
+
+    /**
      * If the current thread has been interrupted, then throws an
      * exception.
      * @throws InterruptedException

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index dd296e4..08a5953 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -15,6 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 package org.apache.storm.hive.trident;
 
 import org.apache.storm.trident.operation.TridentCollector;
@@ -38,7 +40,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Map.Entry;
@@ -55,9 +57,10 @@ public class HiveState implements State {
     private ExecutorService callTimeoutPool;
     private transient Timer heartBeatTimer;
     private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private Boolean sendHeartBeat = true;
     private UserGroupInformation ugi = null;
     private Boolean kerberosEnabled = false;
-    HashMap<HiveEndPoint, HiveWriter> allWriters;
+    private Map<HiveEndPoint, HiveWriter> allWriters;
 
     public HiveState(HiveOptions options) {
         this.options = options;
@@ -93,7 +96,7 @@ public class HiveState implements State {
                 }
             }
 
-            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
             String timeoutName = "hive-bolt-%d";
             this.callTimeoutPool = Executors.newFixedThreadPool(1,
                                                                 new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
@@ -116,9 +119,6 @@ public class HiveState implements State {
 
     private void writeTuples(List<TridentTuple> tuples)
         throws Exception {
-        if(timeToSendHeartBeat.compareAndSet(true, false)) {
-            enableHeartBeatOnAllWriters();
-        }
         for (TridentTuple tuple : tuples) {
             List<String> partitionVals = options.getMapper().mapPartitions(tuple);
             HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
@@ -134,20 +134,18 @@ public class HiveState implements State {
 
     private void abortAndCloseWriters() {
         try {
+            sendHeartBeat = false;
             abortAllWriters();
             closeAllWriters();
-        } catch(InterruptedException e) {
-            LOG.warn("unable to close hive connections. ", e);
-        } catch(IOException ie) {
+        }  catch(Exception ie) {
             LOG.warn("unable to close hive connections. ", ie);
         }
     }
 
     /**
      * Abort current Txn on all writers
-     * @return number of writers retired
      */
-    private void abortAllWriters() throws InterruptedException {
+    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
         for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
             entry.getValue().abort();
         }
@@ -172,8 +170,15 @@ public class HiveState implements State {
             heartBeatTimer.schedule(new TimerTask() {
                     @Override
                     public void run() {
-                        timeToSendHeartBeat.set(true);
-                        setupHeartBeatTimer();
+                        try {
+                            if (sendHeartBeat) {
+                                LOG.debug("Start sending heartbeat on all writers");
+                                sendHeartBeatOnAllWriters();
+                                setupHeartBeatTimer();
+                            }
+                        } catch (Exception e) {
+                            LOG.warn("Failed to heartbeat on HiveWriter ", e);
+                        }
                     }
                 }, options.getHeartBeatInterval() * 1000);
         }
@@ -186,9 +191,9 @@ public class HiveState implements State {
         }
     }
 
-    private void enableHeartBeatOnAllWriters() {
+    private void sendHeartBeatOnAllWriters() throws InterruptedException {
         for (HiveWriter writer : allWriters.values()) {
-            writer.setHeartBeatNeeded();
+            writer.heartBeat();
         }
     }
 
@@ -199,7 +204,7 @@ public class HiveState implements State {
             if( writer == null ) {
                 LOG.info("Creating Writer to Hive end point : " + endPoint);
                 writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
-                if(allWriters.size() > options.getMaxOpenConnections()){
+                if(allWriters.size() > (options.getMaxOpenConnections() - 1)){
                     int retired = retireIdleWriters();
                     if(retired==0) {
                         retireEldestWriter();
@@ -274,6 +279,7 @@ public class HiveState implements State {
     public void cleanup() {
         for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             try {
+                sendHeartBeat = false;
                 HiveWriter w = entry.getValue();
                 LOG.info("Flushing writer to {}", w);
                 w.flush(false);

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
index 7e0e1f2..d6e3c71 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hive.trident;
 
 import org.apache.storm.task.IMetricsContext;

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
index 82cfc15..062f7fb 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hive.trident;
 
 import org.apache.storm.trident.operation.TridentCollector;

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
new file mode 100644
index 0000000..607bd61
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MockTupleHelpers;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class BucketTestHiveTopology {
+    static final String USER_SPOUT_ID = "user-spout";
+    static final String BOLT_ID = "my-hive-bolt";
+    static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        if ((args == null) || (args.length < 7)) {
+            System.out.println("Usage: BucketTestHiveTopology metastoreURI "
+                    + "dbName tableName dataFileLocation hiveBatchSize " +
+                    "hiveTickTupl]eIntervalSecs workers  [topologyNamey] [keytab file]"
+                    + " [principal name] ");
+            System.exit(1);
+        }
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String sourceFileLocation = args[3];
+        Integer hiveBatchSize = Integer.parseInt(args[4]);
+        Integer hiveTickTupleIntervalSecs = Integer.parseInt(args[5]);
+        Integer workers = Integer.parseInt(args[6]);
+        String[] colNames = { "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk",
+                "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk",
+                "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+                "ss_wholesale_cost", "ss_list_price", "ss_sales_price",
+                "ss_ext_discount_amt", "ss_ext_sales_price",
+                "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
+                "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
+                "ss_net_profit" };
+        Config config = new Config();
+        config.setNumWorkers(workers);
+        UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+                .withColumnFields(new Fields(colNames)).withTimeAsPartitionField("yyyy/MM/dd");
+        HiveOptions hiveOptions;
+        hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(10)
+            .withBatchSize(hiveBatchSize);
+        // doing below because its affecting storm metrics most likely
+        // had to make tick tuple a mandatory argument since its positional
+        if (hiveTickTupleIntervalSecs > 0) {
+            hiveOptions.withTickTupleInterval(hiveTickTupleIntervalSecs);
+        }
+        if (args.length == 10) {
+            hiveOptions.withKerberosKeytab(args[8]).withKerberosPrincipal(args[9]);
+        }
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 14)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 6) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else {
+            StormSubmitter.submitTopology(args[7], config, builder.createTopology());
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String filePath;
+        private BufferedReader br;
+        private int count = 0;
+        private long total = 0L;
+        private String[] outputFields = { "ss_sold_date_sk", "ss_sold_time_sk",
+                "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk",
+                "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number",
+                "ss_quantity", "ss_wholesale_cost", "ss_list_price",
+                "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price",
+                "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax",
+                "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax",
+                "ss_net_profit" };
+
+        public UserDataSpout withDataFile (String filePath) {
+            this.filePath = filePath;
+            return this;
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(this.outputFields));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+            try {
+                this.br = new BufferedReader(new FileReader(new File(this
+                        .filePath)));
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        public void nextTuple() {
+            String line;
+            try {
+                if ((line = br.readLine()) != null) {
+                    System.out.println("*********" + line);
+                    String[] values = line.split("\\|", -1);
+                    // above gives an extra empty string at the end. below
+                    // removes that
+                    values = Arrays.copyOfRange(values, 0,
+                            this.outputFields.length);
+                    Values tupleValues = new Values(values);
+                    UUID msgId = UUID.randomUUID();
+                    this.pending.put(msgId, tupleValues);
+                    this.collector.emit(tupleValues, msgId);
+                    count++;
+                    total++;
+                    if (count > 1000) {
+                        count = 0;
+                        System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+                    }
+                }
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
index 1132587..8b61d5e 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -51,7 +51,8 @@ public class HiveTopology {
         config.setNumWorkers(1);
         UserDataSpout spout = new UserDataSpout();
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-            .withColumnFields(new Fields(colNames));
+                .withTimeAsPartitionField("yyyy/MM/dd/hh")
+                .withColumnFields(new Fields(colNames));
         HiveOptions hiveOptions;
         if (args.length == 6) {
             hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
@@ -64,7 +65,8 @@ public class HiveTopology {
             hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
                 .withTxnsPerBatch(10)
                 .withBatchSize(100)
-                .withIdleTimeout(10);
+                .withIdleTimeout(10)
+                .withMaxOpenConnections(1);
         }
 
         HiveBolt hiveBolt = new HiveBolt(hiveOptions);

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index ead2c8d..0cf0084 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -204,7 +204,7 @@ public class TestHiveBolt {
     @Test
     public void testWithTimeformat()
         throws Exception {
-        String[] partNames1 = {"date"};
+        String[] partNames1 = {"dt"};
         String timeFormat = "yyyy/MM/dd";
         HiveSetupUtil.dropDB(conf,dbName1);
         HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1, null,
@@ -213,8 +213,9 @@ public class TestHiveBolt {
             .withColumnFields(new Fields(colNames))
             .withTimeAsPartitionField(timeFormat);
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
-            .withTxnsPerBatch(2)
-            .withBatchSize(1);
+                .withTxnsPerBatch(2)
+                .withBatchSize(1)
+                .withMaxOpenConnections(1);
         bolt = new HiveBolt(hiveOptions);
         bolt.prepare(config,null,collector);
         Integer id = 100;
@@ -319,7 +320,7 @@ public class TestHiveBolt {
 
         //This forces a failure of all the flush attempts
         doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
-        doThrow(new Exception()).when(spyBolt).flushAndCloseWriters();
+
 
         spyBolt.prepare(config, null, new OutputCollector(collector));
 
@@ -383,7 +384,7 @@ public class TestHiveBolt {
         //The tick should NOT cause any acks since the batch was empty except for acking itself
         Tuple mockTick = MockTupleHelpers.mockTickTuple();
         bolt.execute(mockTick);
-        verify(collector).ack(mockTick);
+        verifyZeroInteractions(collector);
 
         bolt.cleanup();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
index 952b0fb..e43fec6 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.SerializationError;
 import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
 import org.apache.storm.hive.bolt.mapper.HiveMapper;
 import org.apache.storm.hive.bolt.HiveSetupUtil;
@@ -140,18 +141,22 @@ public class TestHiveWriter {
                                            , callTimeoutPool, mapper, ugi);
         Tuple tuple = generateTestTuple("1","abc");
         writer.write(mapper.mapRecord(tuple));
+        tuple = generateTestTuple("2","def");
+        writer.write(mapper.mapRecord(tuple));
+        Assert.assertEquals(writer.getTotalRecords(), 2);
         checkRecordCountInTable(dbName,tblName,0);
         writer.flush(true);
+        Assert.assertEquals(writer.getTotalRecords(), 0);
 
-        tuple = generateTestTuple("2","def");
+        tuple = generateTestTuple("3","ghi");
         writer.write(mapper.mapRecord(tuple));
         writer.flush(true);
 
-        tuple = generateTestTuple("3","ghi");
+        tuple = generateTestTuple("4","klm");
         writer.write(mapper.mapRecord(tuple));
         writer.flush(true);
         writer.close();
-        checkRecordCountInTable(dbName,tblName,3);
+        checkRecordCountInTable(dbName,tblName,4);
     }
 
     private Tuple generateTestTuple(Object id, Object msg) {
@@ -167,7 +172,7 @@ public class TestHiveWriter {
     }
 
     private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
-            throws HiveWriter.WriteFailure, InterruptedException {
+        throws HiveWriter.WriteFailure, InterruptedException, SerializationError {
         Integer id = 100;
         String msg = "test-123";
         for (int i = 1; i <= count; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
index 10921bc..d6c1d65 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -179,7 +179,7 @@ public class TridentHiveTopology {
         }
 
         @Override
-        public Map<String, Object> getComponentConfiguration() {
+        public Map getComponentConfiguration() {
             Config conf = new Config();
             conf.setMaxTaskParallelism(1);
             return conf;


[2/2] storm git commit: add STORM-1030 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1030 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1cfaf124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1cfaf124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1cfaf124

Branch: refs/heads/1.x-branch
Commit: 1cfaf12452e130978951b6c7aea6b2c29c340d7a
Parents: a276616
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Mar 29 17:00:10 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Mar 29 17:00:10 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1cfaf124/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ae99355..3c36a3b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1030: Hive Connector Fixes
  * STORM-676: Storm Trident support for sliding/tumbling windows
  * STORM-1630: Add guide page for Windows users
  * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster