You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/25 19:52:53 UTC

[05/13] storm git commit: STORM-539. Storm hive bolt and trident state.

STORM-539. Storm hive bolt and trident state.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81772b22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81772b22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81772b22

Branch: refs/heads/master
Commit: 81772b22668d51852b65bc8bbe0d83116c07e383
Parents: 8036109
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Dec 15 14:24:51 2014 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 12 08:53:43 2015 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +-
 external/storm-hive/README.md                   | 111 +++++
 external/storm-hive/pom.xml                     | 143 +++++++
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 289 +++++++++++++
 .../bolt/mapper/DelimitedRecordHiveMapper.java  | 143 +++++++
 .../storm/hive/bolt/mapper/HiveMapper.java      |  81 ++++
 .../hive/bolt/mapper/JsonRecordHiveMapper.java  | 132 ++++++
 .../apache/storm/hive/common/HiveOptions.java   | 146 +++++++
 .../org/apache/storm/hive/common/HiveUtils.java |  76 ++++
 .../apache/storm/hive/common/HiveWriter.java    | 420 +++++++++++++++++++
 .../apache/storm/hive/trident/HiveState.java    | 306 ++++++++++++++
 .../storm/hive/trident/HiveStateFactory.java    |  31 ++
 .../apache/storm/hive/trident/HiveUpdater.java  |  14 +
 .../apache/storm/hive/bolt/HiveSetupUtil.java   | 220 ++++++++++
 .../apache/storm/hive/bolt/HiveTopology.java    | 150 +++++++
 .../hive/bolt/HiveTopologyPartitioned.java      | 153 +++++++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 330 +++++++++++++++
 .../storm/hive/common/TestHiveWriter.java       | 193 +++++++++
 .../storm/hive/trident/TridentHiveTopology.java | 190 +++++++++
 pom.xml                                         |   4 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 21 files changed, 3148 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ef159e1..df109b3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,4 +31,6 @@ target
 .*
 !/.gitignore
 _site
-storm-core/dependency-reduced-pom.xml
+dependency-reduced-pom.xml
+derby.log
+metastore_db

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
new file mode 100644
index 0000000..6461462
--- /dev/null
+++ b/external/storm-hive/README.md
@@ -0,0 +1,111 @@
+# Storm Hive Bolt & Trident State
+
+  Hive offers streaming API that allows data to be written continuously into Hive. The incoming data 
+  can be continuously committed in small batches of records into existing Hive partition or table. Once the data
+  is committed its immediately visible to all hive queries. More info on Hive Streaming API 
+  https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
+  
+  With the help of Hive Streaming API , HiveBolt and HiveState allows users to stream data from storm into hive directly.
+  To use Hive streaming API users need to create a bucketed table with ORC format.  Example below
+  
+  ```code
+  create table test_table ( id INT, name STRING, phone STRING, street STRING) partitioned by (city STRING, state STRING) stored as orc tblproperties ("orc.compress"="NONE");
+  ```
+  
+
+## HiveBolt
+
+HiveBolt streams tuples directly into hive. Tuples are written using Hive Transactions. 
+Partiions to which HiveBolt will stream to can either created or pre-created or optionally
+HiveBolt  can create them if they are missing. Fields from Tuples are mapped to table columns.
+User should make sure that Tuple filed names are matched to the table column names.
+
+```java
+DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper);
+HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+```
+
+### RecordHiveMapper
+   This class maps Tuple filed names to Hive table column names.
+   There are two implementaitons available
+ 
+   
+   1) DelimitedRecordHiveMapper
+   2) JsonRecordHiveMapper
+   
+   ```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+    or
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+   ```
+
+|Arg | Description | Type
+|--- |--- |---
+|withColumnFields| field names in a tuple to be mapped to table column names | Fileds (required) |
+|withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
+|withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
+
+### HiveOptions
+  
+HiveBolt takes in HiveOptions as a constructor arg.
+
+  ```java
+  HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+  ```
+
+
+HiveOptions params
+
+|Arg  |Description | Type
+|---	|--- |---
+|metaStoreURI | hive meta store URI (can be found in hive-site.xml) | String (required) |
+|dbName | database name | String (required) |
+|tblName | table name | String (required) |
+|mapper| Mapper class to map Tuple field names to Table column names | DelimitedRecordHiveMapper or JsonRecordHiveMapper (required) |
+|withTxnsPerBatch | Hive grants a *batch of transactions* instead of single transactions to streaming clients like HiveBolt.This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.| Integer . default 100 |
+|withMaxOpenConnections| Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.| Integer . default 100|
+|withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
+|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
+|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. defalut true |
+|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
+|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+
+
+ 
+## HiveState
+
+Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
+
+```code
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+            
+   HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+                	     		
+   StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+   TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+ ```
+   
+ 
+ 
+
+
+
+
+
+
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
new file mode 100644
index 0000000..adf7567
--- /dev/null
+++ b/external/storm-hive/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>storm</artifactId>
+    <groupId>org.apache.storm</groupId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <packaging>jar</packaging>
+  <artifactId>storm-hive</artifactId>
+  <name>storm-hive</name>
+  <developers>
+    <developer>
+      <id>harshach</id>
+      <name>Sriharsha Chintalapani</name>
+      <email>mail@harsha.io</email>
+    </developer>
+  </developers>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-streaming</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+	    <groupId>org.apache.calcite</groupId>
+	    <artifactId>calcite-core</artifactId>
+      <version>0.9.2-incubating</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.9.0</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
new file mode 100644
index 0000000..849697d
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.hive.common.HiveWriter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+public class HiveBolt extends  BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+    private OutputCollector collector;
+    private HiveOptions options;
+    private Integer currentBatchSize;
+    private ExecutorService callTimeoutPool;
+    private transient Timer heartBeatTimer;
+    private Boolean kerberosEnabled = false;
+    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private UserGroupInformation ugi = null;
+    HashMap<HiveEndPoint, HiveWriter> allWriters;
+
+    public HiveBolt(HiveOptions options) {
+        this.options = options;
+        this.currentBatchSize = 0;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector)  {
+        try {
+            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) {
+                kerberosEnabled = false;
+            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) {
+                kerberosEnabled = true;
+            } else {
+                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " +
+                                                   " & KerberosKeytab");
+            }
+
+            if (kerberosEnabled) {
+                try {
+                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
+                } catch(HiveUtils.AuthenticationFailed ex) {
+                    LOG.error("Hive Kerberos authentication failed " + ex.getMessage(), ex);
+                    throw new IllegalArgumentException(ex);
+                }
+            }
+            this.collector = collector;
+            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            String timeoutName = "hive-bolt-%d";
+            this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            heartBeatTimer = new Timer();
+            setupHeartBeatTimer();
+        } catch(Exception e) {
+            LOG.warn("unable to make connection to hive ",e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+            HiveWriter writer = getOrCreateWriter(endPoint);
+            if(timeToSendHeartBeat.compareAndSet(true, false)) {
+                enableHeartBeatOnAllWriters();
+            }
+            writer.write(options.getMapper().mapRecord(tuple));
+            currentBatchSize++;
+            if(currentBatchSize >= options.getBatchSize()) {
+                flushAllWriters();
+                currentBatchSize = 0;
+            }
+            collector.ack(tuple);
+        } catch(Exception e) {
+            collector.fail(tuple);
+            flushAndCloseWriters();
+            LOG.warn("hive streaming failed. ",e);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                LOG.info("Flushing writer to {}", w);
+                w.flush(false);
+                LOG.info("Closing writer to {}", w);
+                w.close();
+            } catch (Exception ex) {
+                LOG.warn("Error while closing writer to " + entry.getKey() +
+                         ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        ExecutorService toShutdown[] = {callTimeoutPool};
+        for (ExecutorService execService : toShutdown) {
+            execService.shutdown();
+            try {
+                while (!execService.isTerminated()) {
+                    execService.awaitTermination(
+                                 options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException ex) {
+                LOG.warn("shutdown interrupted on " + execService, ex);
+            }
+        }
+        callTimeoutPool = null;
+        super.cleanup();
+        LOG.info("Hive Bolt stopped");
+    }
+
+
+    private void setupHeartBeatTimer() {
+        if(options.getHeartBeatInterval()>0) {
+            heartBeatTimer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        timeToSendHeartBeat.set(true);
+                        setupHeartBeatTimer();
+                    }
+                }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void flushAllWriters()
+        throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for(HiveWriter writer: allWriters.values()) {
+            writer.flush(true);
+        }
+    }
+
+    /**
+     * Closes all writers and remove them from cache
+     * @return number of writers retired
+     */
+    private void closeAllWriters() {
+        try {
+            //1) Retire writers
+            for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+                entry.getValue().close();
+            }
+            //2) Clear cache
+            allWriters.clear();
+        } catch(Exception e) {
+            LOG.warn("unable to close writers. ", e);
+        }
+    }
+
+    private void flushAndCloseWriters() {
+        try {
+            flushAllWriters();
+        } catch(Exception e) {
+            LOG.warn("unable to flush hive writers. ", e);
+        } finally {
+            closeAllWriters();
+        }
+    }
+
+    private void enableHeartBeatOnAllWriters() {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.setHeartBeatNeeded();
+        }
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        try {
+            HiveWriter writer = allWriters.get( endPoint );
+            if( writer == null ) {
+                LOG.debug("Creating Writer to Hive end point : " + endPoint);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if(allWriters.size() > options.getMaxOpenConnections()){
+                    int retired = retireIdleWriters();
+                    if(retired==0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+    }
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            LOG.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
+
+        //1) Find retirement candidates
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for(HiveEndPoint ep : retirees) {
+            try {
+                LOG.info("Closing idle Writer to Hive end point : {}", ep);
+                allWriters.remove(ep).close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
new file mode 100644
index 0000000..d516795
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.IOException;
+
+public class DelimitedRecordHiveMapper implements HiveMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private Fields columnFields;
+    private Fields partitionFields;
+    private String[] columnNames;
+    private String timeFormat;
+    private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+    private SimpleDateFormat parseDate;
+
+    public DelimitedRecordHiveMapper() {
+    }
+
+    public DelimitedRecordHiveMapper withColumnFields(Fields columnFields) {
+        this.columnFields = columnFields;
+        List<String> tempColumnNamesList = this.columnFields.toList();
+        columnNames = new String[tempColumnNamesList.size()];
+        tempColumnNamesList.toArray(columnNames);
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withPartitionFields(Fields partitionFields) {
+        this.partitionFields = partitionFields;
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter){
+        this.fieldDelimiter = delimiter;
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
+        this.timeFormat = timeFormat;
+        parseDate = new SimpleDateFormat(timeFormat);
+        return this;
+    }
+
+    @Override
+    public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException {
+        return new DelimitedInputWriter(columnNames, fieldDelimiter,endPoint);
+    }
+
+    @Override
+    public void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException {
+        txnBatch.write(mapRecord(tuple));
+    }
+
+    @Override
+    public List<String> mapPartitions(Tuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(Tuple tuple) {
+        StringBuilder builder = new StringBuilder();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                builder.append(tuple.getValueByField(field));
+                builder.append(fieldDelimiter);
+            }
+        }
+        return builder.toString().getBytes();
+    }
+
+    @Override
+    public List<String> mapPartitions(TridentTuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(TridentTuple tuple) {
+        StringBuilder builder = new StringBuilder();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                builder.append(tuple.getValueByField(field));
+                builder.append(fieldDelimiter);
+            }
+        }
+        return builder.toString().getBytes();
+    }
+
+    private String getPartitionsByTimeFormat() {
+        Date d = new Date();
+        return parseDate.format(d.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
new file mode 100644
index 0000000..a3b5531
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+import java.util.List;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import java.io.Serializable;
+
+import java.io.IOException;
+
+/**
+ * Maps a <code>backtype.storm.tuple.Tupe</code> object
+ * to a row in an Hive table.
+ */
+public interface HiveMapper extends Serializable {
+
+    /**
+     * Given a endPoint, returns a RecordWriter with columnNames.
+     *
+     * @param tuple
+     * @return
+     */
+
+    RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException;
+
+    void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException;
+
+    /**
+     * Given a tuple, return a hive partition values list.
+     *
+     * @param tuple
+     * @return List<String>
+     */
+    List<String> mapPartitions(Tuple tuple);
+
+    /**
+     * Given a tuple, maps to a HiveRecord based on columnFields
+     * @Param Tuple
+     * @return byte[]
+     */
+    byte[] mapRecord(Tuple tuple);
+
+    /**
+     * Given a TridetnTuple, return a hive partition values list.
+     *
+     * @param TridentTuple
+     * @return List<String>
+     */
+    List<String> mapPartitions(TridentTuple tuple);
+
+    /**
+     * Given a TridentTuple, maps to a HiveRecord based on columnFields
+     * @Param TridentTuple
+     * @return byte[]
+     */
+    byte[] mapRecord(TridentTuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
new file mode 100644
index 0000000..ce3e475
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Fields;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.IOException;
+
+public class JsonRecordHiveMapper implements HiveMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
+    private Fields columnFields;
+    private Fields partitionFields;
+    private String timeFormat;
+    private SimpleDateFormat parseDate;
+
+    public JsonRecordHiveMapper() {
+    }
+
+    public JsonRecordHiveMapper withColumnFields(Fields columnFields) {
+        this.columnFields = columnFields;
+        return this;
+    }
+
+    public JsonRecordHiveMapper withPartitionFields(Fields partitionFields) {
+        this.partitionFields = partitionFields;
+        return this;
+    }
+
+    public JsonRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
+        this.timeFormat = timeFormat;
+        parseDate = new SimpleDateFormat(timeFormat);
+        return this;
+    }
+
+    @Override
+    public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException {
+        return new StrictJsonWriter(endPoint);
+    }
+
+    @Override
+    public void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException {
+        txnBatch.write(mapRecord(tuple));
+    }
+
+    @Override
+    public List<String> mapPartitions(Tuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(Tuple tuple) {
+        JSONObject obj = new JSONObject();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                obj.put(field,tuple.getValueByField(field));
+            }
+        }
+        return obj.toJSONString().getBytes();
+    }
+
+    @Override
+    public List<String> mapPartitions(TridentTuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(TridentTuple tuple) {
+        JSONObject obj = new JSONObject();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                obj.put(field,tuple.getValueByField(field));
+            }
+        }
+        return obj.toJSONString().getBytes();
+    }
+
+    private String getPartitionsByTimeFormat() {
+        Date d = new Date();
+        return parseDate.format(d.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
new file mode 100644
index 0000000..d316294
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import java.io.Serializable;
+
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.hive.hcatalog.streaming.*;
+
+
+public class HiveOptions implements Serializable {
+    protected HiveMapper mapper;
+    protected String databaseName;
+    protected String tableName;
+    protected String metaStoreURI;
+    protected Integer txnsPerBatch = 100;
+    protected Integer maxOpenConnections = 500;
+    protected Integer batchSize = 15000;
+    protected Integer idleTimeout = 0;
+    protected Integer callTimeout = 10000;
+    protected Integer heartBeatInterval = 240;
+    protected Boolean autoCreatePartitions = true;
+    protected String kerberosPrincipal;
+    protected String kerberosKeytab;
+
+    public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) {
+        this.metaStoreURI = metaStoreURI;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.mapper = mapper;
+    }
+
+    public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
+        this.txnsPerBatch = txnsPerBatch;
+        return this;
+    }
+
+    public HiveOptions withMaxOpenConnections(Integer maxOpenConnections) {
+        this.maxOpenConnections = maxOpenConnections;
+        return this;
+    }
+
+    public HiveOptions withBatchSize(Integer batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public HiveOptions withIdleTimeout(Integer idleTimeout) {
+        this.idleTimeout = idleTimeout;
+        return this;
+    }
+
+    public HiveOptions withCallTimeout(Integer callTimeout) {
+        this.callTimeout = callTimeout;
+        return this;
+    }
+
+    public HiveOptions withHeartBeatInterval(Integer heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+        return this;
+    }
+
+    public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
+        this.autoCreatePartitions = autoCreatePartitions;
+        return this;
+    }
+
+    public HiveOptions withKerberosKeytab(String kerberosKeytab) {
+        this.kerberosKeytab = kerberosKeytab;
+        return this;
+    }
+
+    public HiveOptions withKerberosPrincipal(String kerberosPrincipal) {
+        this.kerberosPrincipal = kerberosPrincipal;
+        return this;
+    }
+
+    public String getMetaStoreURI() {
+        return metaStoreURI;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public HiveMapper getMapper() {
+        return mapper;
+    }
+
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public Integer getCallTimeOut() {
+        return callTimeout;
+    }
+
+    public Integer getHeartBeatInterval() {
+        return heartBeatInterval;
+    }
+
+    public Integer getMaxOpenConnections() {
+        return maxOpenConnections;
+    }
+
+    public Integer getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public Integer getTxnsPerBatch() {
+        return txnsPerBatch;
+    }
+
+    public Boolean getAutoCreatePartitions() {
+        return autoCreatePartitions;
+    }
+
+    public String getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public String getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
new file mode 100644
index 0000000..5483b07
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.hive.hcatalog.streaming.*;
+
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.io.File;
+import java.io.IOException;
+
+public class HiveUtils {
+
+    public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
+        if(partitionVals==null) {
+            return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
+        }
+        return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
+    }
+
+    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
+                              options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi);
+    }
+
+    public static synchronized UserGroupInformation authenticate(String keytab, String principal)
+    throws AuthenticationFailed {
+        File kfile = new File(keytab);
+        if (!(kfile.isFile() && kfile.canRead())) {
+            throw new IllegalArgumentException("The keyTab file: "
+                                               + keytab + " is nonexistent or can't read. "
+                                               + "Please specify a readable keytab file for Kerberos auth.");
+        }
+        try {
+            principal = SecurityUtil.getServerPrincipal(principal, "");
+        } catch (Exception e) {
+            throw new AuthenticationFailed("Host lookup error when resolving principal " + principal, e);
+        }
+        try {
+            UserGroupInformation.loginUserFromKeytab(principal, keytab);
+            return UserGroupInformation.getLoginUser();
+        } catch (IOException e) {
+            throw new AuthenticationFailed("Login failed for principal " + principal, e);
+        }
+    }
+
+     public static class AuthenticationFailed extends Exception {
+         public AuthenticationFailed(String reason, Exception cause) {
+             super("Kerberos Authentication Failed. " + reason, cause);
+         }
+     }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
new file mode 100644
index 0000000..726b8e8
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import backtype.storm.tuple.Tuple;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HiveWriter {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(HiveWriter.class);
+
+    private final HiveEndPoint endPoint;
+    private final StreamingConnection connection;
+    private final int txnsPerBatch;
+    private final RecordWriter recordWriter;
+    private TransactionBatch txnBatch;
+    private final ExecutorService callTimeoutPool;
+    private final long callTimeout;
+
+    private long lastUsed; // time of last flush on this writer
+    protected boolean closed; // flag indicating HiveWriter was closed
+    private boolean autoCreatePartitions;
+    private boolean heartBeatNeeded = false;
+    private UserGroupInformation ugi;
+
+    public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
+                      boolean autoCreatePartitions, long callTimeout,
+                      ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi)
+        throws InterruptedException, ConnectFailure {
+        try {
+            this.autoCreatePartitions = autoCreatePartitions;
+            this.callTimeout = callTimeout;
+            this.callTimeoutPool = callTimeoutPool;
+            this.endPoint = endPoint;
+            this.ugi = ugi;
+            this.connection = newConnection(ugi);
+            this.txnsPerBatch = txnsPerBatch;
+            this.recordWriter = mapper.createRecordWriter(endPoint);
+            this.txnBatch = nextTxnBatch(recordWriter);
+            this.closed = false;
+            this.lastUsed = System.currentTimeMillis();
+        } catch(InterruptedException e) {
+            throw e;
+        } catch(RuntimeException e) {
+            throw e;
+        } catch(Exception e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return endPoint.toString();
+    }
+
+    public void setHeartBeatNeeded() {
+        heartBeatNeeded = true;
+    }
+
+    /**
+     * Write data <br />
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public synchronized void write(final byte[] record)
+        throws WriteFailure, InterruptedException {
+        if (closed) {
+            throw new IllegalStateException("This hive streaming writer was closed " +
+                                            "and thus no longer able to write : " + endPoint);
+        }
+        // write the tuple
+        try {
+            LOG.debug("Writing event to {}", endPoint);
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws StreamingException, InterruptedException {
+                        txnBatch.write(record);
+                        return null;
+                    }
+                });
+        } catch(StreamingException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        } catch(TimeoutException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    /**
+     * Commits the current Txn.
+     * If 'rollToNext' is true, will switch to next Txn in batch or to a
+     *       new TxnBatch if current Txn batch is exhausted
+     * TODO: see what to do when there are errors in each IO call stage
+     */
+    public void flush(boolean rollToNext)
+        throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
+        if(heartBeatNeeded) {
+            heartBeatNeeded = false;
+            heartBeat();
+        }
+        lastUsed = System.currentTimeMillis();
+        try {
+            commitTxn();
+            if(txnBatch.remainingTransactions() == 0) {
+                closeTxnBatch();
+                txnBatch = null;
+                if(rollToNext) {
+                    txnBatch = nextTxnBatch(recordWriter);
+                }
+            }
+            if(rollToNext) {
+                LOG.debug("Switching to next Txn for {}", endPoint);
+                txnBatch.beginNextTransaction(); // does not block
+            }
+        } catch(StreamingException e) {
+            throw new TxnFailure(txnBatch, e);
+        }
+    }
+
+    /** Queues up a heartbeat request on the current and remaining txns using the
+     *  heartbeatThdPool and returns immediately
+     */
+    public void heartBeat() throws InterruptedException {
+        // 1) schedule the heartbeat on one thread in pool
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws Exception {
+                        try {
+                            LOG.debug("Sending heartbeat on batch " + txnBatch);
+                            txnBatch.heartbeat();
+                        } catch (StreamingException e) {
+                            LOG.warn("Heartbeat error on batch " + txnBatch, e);
+                        }
+                        return null;
+                    }
+                });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+            // Suppressing exceptions as we don't care for errors on heartbeats
+        }
+    }
+
+    /**
+     * Close the Transaction Batch and connection
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void close() throws IOException, InterruptedException {
+        closeTxnBatch();
+        closeConnection();
+        closed = true;
+    }
+
+    private void closeConnection() throws InterruptedException {
+        LOG.info("Closing connection to end point : {}", endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        connection.close(); // could block
+                        return null;
+                    }
+                });
+        } catch(Exception e) {
+            LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on connection close
+        }
+    }
+
+    private void commitTxn() throws CommitFailure, InterruptedException {
+        LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId() , endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        txnBatch.commit(); // could block
+                        return null;
+                    }
+                });
+        } catch (StreamingException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        } catch (TimeoutException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    private StreamingConnection newConnection(final UserGroupInformation ugi)
+        throws InterruptedException, ConnectFailure {
+        try {
+            return  callWithTimeout(new CallRunner<StreamingConnection>() {
+                    @Override
+                    public StreamingConnection call() throws Exception {
+                        return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
+                    }
+                });
+        } catch(StreamingException e) {
+            throw new ConnectFailure(endPoint, e);
+        } catch(TimeoutException e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
+        throws InterruptedException, TxnBatchFailure {
+        LOG.debug("Fetching new Txn Batch for {}", endPoint);
+        TransactionBatch batch = null;
+        try {
+            batch = callWithTimeout(new CallRunner<TransactionBatch>() {
+                @Override
+                public TransactionBatch call() throws Exception {
+                    return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
+                }
+            });
+        LOG.debug("Acquired {}. Switching to first txn", batch);
+        batch.beginNextTransaction();
+        } catch(TimeoutException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        } catch(StreamingException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        }
+        return batch;
+    }
+
+    private void closeTxnBatch() throws  InterruptedException {
+        try {
+            LOG.debug("Closing Txn Batch {}", txnBatch);
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws Exception {
+                        if(txnBatch != null) {
+                            txnBatch.close(); // could block
+                        }
+                        return null;
+                    }
+                });
+        } catch(InterruptedException e) {
+            throw e;
+        } catch(Exception e) {
+            LOG.warn("Error closing txn batch "+ txnBatch, e);
+        }
+    }
+
+    /**
+     * Aborts the current Txn and switches to next Txn.
+     * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+     */
+    public void abort() throws InterruptedException {
+        abortTxn();
+    }
+
+    private void abortTxn() throws InterruptedException {
+        LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws StreamingException, InterruptedException {
+                        txnBatch.abort(); // could block
+                        return null;
+                    }
+                });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (TimeoutException e) {
+            LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+        } catch (Exception e) {
+            LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on abort
+        }
+    }
+
+
+    /**
+     * If the current thread has been interrupted, then throws an
+     * exception.
+     * @throws InterruptedException
+     */
+    private static void checkAndThrowInterruptedException()
+        throws InterruptedException {
+        if (Thread.currentThread().interrupted()) {
+            throw new InterruptedException("Timed out before Hive call was made. "
+                                           + "Your callTimeout might be set too low or Hive calls are "
+                                           + "taking too long.");
+        }
+    }
+
+    /**
+     * Execute the callable on a separate thread and wait for the completion
+     * for the specified amount of time in milliseconds. In case of timeout
+     * cancel the callable and throw an IOException
+     */
+    private <T> T callWithTimeout(final CallRunner<T> callRunner)
+        throws TimeoutException, StreamingException, InterruptedException {
+        Future<T> future = callTimeoutPool.submit(new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    return callRunner.call();
+                }
+            });
+        try {
+            if (callTimeout > 0) {
+                return future.get(callTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                return future.get();
+            }
+        } catch (TimeoutException eT) {
+            future.cancel(true);
+            throw eT;
+        } catch (ExecutionException e1) {
+            Throwable cause = e1.getCause();
+            if (cause instanceof IOException) {
+                throw new StreamingIOFailure("I/O Failure", (IOException) cause);
+            } else if (cause instanceof StreamingException) {
+                throw (StreamingException) cause;
+            } else if (cause instanceof InterruptedException) {
+                throw (InterruptedException) cause;
+            } else if (cause instanceof RuntimeException) {
+                throw (RuntimeException) cause;
+            } else if (cause instanceof TimeoutException) {
+                throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
+            } else {
+                throw new RuntimeException(e1);
+            }
+        }
+    }
+
+    public long getLastUsed() {
+        return lastUsed;
+    }
+
+    private byte[] generateRecord(Tuple tuple) {
+        StringBuilder buf = new StringBuilder();
+        for (Object o: tuple.getValues()) {
+            buf.append(o);
+            buf.append(",");
+        }
+        return buf.toString().getBytes();
+    }
+
+    /**
+     * Simple interface whose <tt>call</tt> method is called by
+     * {#callWithTimeout} in a new thread inside a
+     * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+     * @param <T>
+     */
+    private interface CallRunner<T> {
+        T call() throws Exception;
+    }
+
+    public static class Failure extends Exception {
+        public Failure(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    public static class WriteFailure extends Failure {
+        public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
+            super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
+        }
+    }
+
+    public static class CommitFailure extends Failure {
+        public CommitFailure(HiveEndPoint endPoint, Long txnID, Throwable cause) {
+            super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
+        }
+    }
+
+    public static class ConnectFailure extends Failure {
+        public ConnectFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed connecting to EndPoint " + ep, cause);
+        }
+    }
+
+    public static class TxnBatchFailure extends Failure {
+        public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
+        }
+    }
+
+    public static class TxnFailure extends Failure {
+        public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
+            super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
new file mode 100644
index 0000000..6050aa8
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HiveState implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveState.class);
+    private HiveOptions options;
+    private Integer currentBatchSize;
+    private ExecutorService callTimeoutPool;
+    private transient Timer heartBeatTimer;
+    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private UserGroupInformation ugi = null;
+    private Boolean kerberosEnabled = false;
+    HashMap<HiveEndPoint, HiveWriter> allWriters;
+
+    public HiveState(HiveOptions options) {
+        this.options = options;
+        this.currentBatchSize = 0;
+    }
+
+
+    @Override
+    public void beginCommit(Long txId) {
+    }
+
+    @Override
+    public void commit(Long txId) {
+    }
+
+    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)  {
+        try {
+            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) {
+                kerberosEnabled = false;
+            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) {
+                kerberosEnabled = true;
+            } else {
+                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " +
+                                                   " & KerberosKeytab");
+            }
+
+            if (kerberosEnabled) {
+                try {
+                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
+                } catch(HiveUtils.AuthenticationFailed ex) {
+                    LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
+                    throw new IllegalArgumentException(ex);
+                }
+            }
+
+            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            String timeoutName = "hive-bolt-%d";
+            this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                                                                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            heartBeatTimer= new Timer();
+            setupHeartBeatTimer();
+        } catch(Exception e) {
+            LOG.warn("unable to make connection to hive ",e);
+        }
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        try {
+            writeTuples(tuples);
+        } catch (Exception e) {
+            abortAndCloseWriters();
+            LOG.warn("hive streaming failed.",e);
+            throw new FailedException(e);
+        }
+    }
+
+    private void writeTuples(List<TridentTuple> tuples)
+        throws Exception {
+        if(timeToSendHeartBeat.compareAndSet(true, false)) {
+            enableHeartBeatOnAllWriters();
+        }
+        for (TridentTuple tuple : tuples) {
+            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+            HiveWriter writer = getOrCreateWriter(endPoint);
+            writer.write(options.getMapper().mapRecord(tuple));
+            currentBatchSize++;
+            if(currentBatchSize >= options.getBatchSize()) {
+                flushAllWriters();
+                currentBatchSize = 0;
+            }
+        }
+    }
+
+    private void abortAndCloseWriters() {
+        try {
+            abortAllWriters();
+            closeAllWriters();
+        } catch(InterruptedException e) {
+            LOG.warn("unable to close hive connections. ", e);
+        } catch(IOException ie) {
+            LOG.warn("unable to close hive connections. ", ie);
+        }
+    }
+
+    /**
+     * Abort current Txn on all writers
+     * @return number of writers retired
+     */
+    private void abortAllWriters() throws InterruptedException {
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            entry.getValue().abort();
+        }
+    }
+
+
+    /**
+     * Closes all writers and remove them from cache
+     * @return number of writers retired
+     */
+    private void closeAllWriters() throws InterruptedException, IOException {
+        //1) Retire writers
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            entry.getValue().close();
+        }
+        //2) Clear cache
+        allWriters.clear();
+    }
+
+    private void setupHeartBeatTimer() {
+        if(options.getHeartBeatInterval()>0) {
+            heartBeatTimer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        timeToSendHeartBeat.set(true);
+                        setupHeartBeatTimer();
+                    }
+                }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void flushAllWriters()
+        throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for(HiveWriter writer: allWriters.values()) {
+            writer.flush(true);
+        }
+    }
+
+    private void enableHeartBeatOnAllWriters() {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.setHeartBeatNeeded();
+        }
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        try {
+            HiveWriter writer = allWriters.get( endPoint );
+            if( writer == null ) {
+                LOG.info("Creating Writer to Hive end point : " + endPoint);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if(allWriters.size() > options.getMaxOpenConnections()){
+                    int retired = retireIdleWriters();
+                    if(retired==0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+
+    }
+
+
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            LOG.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
+
+        //1) Find retirement candidates
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for(HiveEndPoint ep : retirees) {
+            try {
+                LOG.info("Closing idle Writer to Hive end point : {}", ep);
+                allWriters.remove(ep).close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        return count;
+    }
+
+    public void cleanup() {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                LOG.info("Flushing writer to {}", w);
+                w.flush(false);
+                LOG.info("Closing writer to {}", w);
+                w.close();
+            } catch (Exception ex) {
+                LOG.warn("Error while closing writer to " + entry.getKey() +
+                         ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        ExecutorService toShutdown[] = {callTimeoutPool};
+        for (ExecutorService execService : toShutdown) {
+            execService.shutdown();
+            try {
+                while (!execService.isTerminated()) {
+                    execService.awaitTermination(
+                                                 options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException ex) {
+                LOG.warn("shutdown interrupted on " + execService, ex);
+            }
+        }
+        callTimeoutPool = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
new file mode 100644
index 0000000..8f3b9e9
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -0,0 +1,31 @@
+package org.apache.storm.hive.trident;
+
+import backtype.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+
+
+public class HiveStateFactory implements StateFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveStateFactory.class);
+    private HiveOptions options;
+
+    public HiveStateFactory(){}
+
+    public HiveStateFactory withOptions(HiveOptions options){
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+        HiveState state = new HiveState(this.options);
+        state.prepare(conf, metrics, partitionIndex, numPartitions);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
new file mode 100644
index 0000000..b0b32f1
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
@@ -0,0 +1,14 @@
+package org.apache.storm.hive.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class HiveUpdater extends BaseStateUpdater<HiveState>{
+    @Override
+    public void updateState(HiveState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
new file mode 100644
index 0000000..d492819
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.thrift.TException;
+
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveSetupUtil {
+    public static class RawFileSystem extends RawLocalFileSystem {
+        private static final URI NAME;
+        static {
+            try {
+                NAME = new URI("raw:///");
+            } catch (URISyntaxException se) {
+                throw new IllegalArgumentException("bad uri", se);
+            }
+        }
+
+        @Override
+        public URI getUri() {
+            return NAME;
+        }
+
+        @Override
+        public FileStatus getFileStatus(Path path) throws IOException {
+            File file = pathToFile(path);
+            if (!file.exists()) {
+                throw new FileNotFoundException("Can't find " + path);
+            }
+            // get close enough
+            short mod = 0;
+            if (file.canRead()) {
+                mod |= 0444;
+            }
+            if (file.canWrite()) {
+                mod |= 0200;
+            }
+            if (file.canExecute()) {
+                mod |= 0111;
+            }
+            ShimLoader.getHadoopShims();
+            return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+                                  file.lastModified(), file.lastModified(),
+                                  FsPermission.createImmutable(mod), "owen", "users", path);
+        }
+    }
+
+    private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+    public static HiveConf getHiveConf() {
+        HiveConf conf = new HiveConf();
+        // String metastoreDBLocation = "jdbc:derby:databaseName=/tmp/metastore_db;create=true";
+        // conf.set("javax.jdo.option.ConnectionDriverName","org.apache.derby.jdbc.EmbeddedDriver");
+        // conf.set("javax.jdo.option.ConnectionURL",metastoreDBLocation);
+        conf.set("fs.raw.impl", RawFileSystem.class.getName());
+        conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
+        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+        return conf;
+    }
+
+    public static void createDbAndTable(HiveConf conf, String databaseName,
+                                        String tableName, List<String> partVals,
+                                        String[] colNames, String[] colTypes,
+                                        String[] partNames, String dbLocation)
+        throws Exception {
+        IMetaStoreClient client = new HiveMetaStoreClient(conf);
+        try {
+            Database db = new Database();
+            db.setName(databaseName);
+            db.setLocationUri(dbLocation);
+            client.createDatabase(db);
+
+            Table tbl = new Table();
+            tbl.setDbName(databaseName);
+            tbl.setTableName(tableName);
+            tbl.setTableType(TableType.MANAGED_TABLE.toString());
+            StorageDescriptor sd = new StorageDescriptor();
+            sd.setCols(getTableColumns(colNames, colTypes));
+            sd.setNumBuckets(1);
+            sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
+            if(partNames!=null && partNames.length!=0) {
+                tbl.setPartitionKeys(getPartitionKeys(partNames));
+            }
+
+            tbl.setSd(sd);
+
+            sd.setBucketCols(new ArrayList<String>(2));
+            sd.setSerdeInfo(new SerDeInfo());
+            sd.getSerdeInfo().setName(tbl.getTableName());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+            sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
+            sd.setInputFormat(OrcInputFormat.class.getName());
+            sd.setOutputFormat(OrcOutputFormat.class.getName());
+
+            Map<String, String> tableParams = new HashMap<String, String>();
+            tbl.setParameters(tableParams);
+            client.createTable(tbl);
+            try {
+                if(partVals!=null && partVals.size() > 0) {
+                    addPartition(client, tbl, partVals);
+                }
+            } catch(AlreadyExistsException e) {
+            }
+        } finally {
+            client.close();
+        }
+    }
+
+    // delete db and all tables in it
+    public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException {
+        IMetaStoreClient client = new HiveMetaStoreClient(conf);
+        try {
+            for (String table : client.listTableNamesByFilter(databaseName, "", (short) -1)) {
+                client.dropTable(databaseName, table, true, true);
+            }
+            client.dropDatabase(databaseName);
+        } catch (TException e) {
+            client.close();
+        }
+    }
+
+    private static void addPartition(IMetaStoreClient client, Table tbl
+                                     , List<String> partValues)
+        throws IOException, TException {
+        Partition part = new Partition();
+        part.setDbName(tbl.getDbName());
+        part.setTableName(tbl.getTableName());
+        StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
+        sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys(), partValues));
+        part.setSd(sd);
+        part.setValues(partValues);
+        client.add_partition(part);
+    }
+
+    private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
+        if(partKeys.size()!=partVals.size()) {
+            throw new IllegalArgumentException("Partition values:" + partVals +
+                                               ", does not match the partition Keys in table :" + partKeys );
+        }
+        StringBuffer buff = new StringBuffer(partKeys.size()*20);
+        int i=0;
+        for(FieldSchema schema : partKeys) {
+            buff.append(schema.getName());
+            buff.append("=");
+            buff.append(partVals.get(i));
+            if(i!=partKeys.size()-1) {
+                buff.append(Path.SEPARATOR);
+            }
+            ++i;
+        }
+        return buff.toString();
+    }
+
+    private static List<FieldSchema> getTableColumns(String[] colNames, String[] colTypes) {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for (int i=0; i<colNames.length; ++i) {
+            fields.add(new FieldSchema(colNames[i], colTypes[i], ""));
+        }
+        return fields;
+    }
+
+    private static List<FieldSchema> getPartitionKeys(String[] partNames) {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for (int i=0; i < partNames.length; ++i) {
+           fields.add(new FieldSchema(partNames[i], serdeConstants.STRING_TYPE_NAME, ""));
+        }
+        return fields;
+    }
+
+}