You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/25 19:52:49 UTC

[01/13] storm git commit: STORM-539. Storm hive bolt and trident state.

Repository: storm
Updated Branches:
  refs/heads/master 0a3a0aabc -> 14a302f54


http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
new file mode 100644
index 0000000..e9ecbd0
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopology {
+    static final String USER_SPOUT_ID = "user-spout";
+    static final String BOLT_ID = "my-hive-bolt";
+    static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String[] colNames = {"id","name","phone","street","city","state"};
+        Config config = new Config();
+        config.setNumWorkers(1);
+        UserDataSpout spout = new UserDataSpout();
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+        HiveOptions hiveOptions;
+        if (args.length == 6) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(100)
+                .withIdleTimeout(10)
+                .withKerberosKeytab(args[4])
+                .withKerberosPrincipal(args[5]);
+        } else {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(100)
+                .withIdleTimeout(10);
+        }
+
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 1)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length >= 4) {
+            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "1,user1,123456,street1,sunnyvale,ca",
+                "2,user2,123456,street2,sunnyvale,ca",
+                "3,user3,123456,street3,san jose,ca",
+                "4,user4,123456,street4,san jose,ca",
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id","name","phone","street","city","state"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String[] user = sentences[index].split(",");
+            Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 1000){
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
new file mode 100644
index 0000000..c3197c2
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopologyPartitioned {
+    static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
+    static final String BOLT_ID = "my-hive-bolt-partitioned";
+    static final String TOPOLOGY_NAME = "hive-test-topology-partitioned";
+
+    public static void main(String[] args) throws Exception {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String[] partNames = {"city","state"};
+        String[] colNames = {"id","name","phone","street"};
+        Config config = new Config();
+        config.setNumWorkers(1);
+        UserDataSpout spout = new UserDataSpout();
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions;
+        if (args.length == 6) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(1000)
+                .withIdleTimeout(10)
+                .withKerberosKeytab(args[4])
+                .withKerberosPrincipal(args[5]);
+        } else {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(1000)
+                .withIdleTimeout(10);
+        }
+
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 1)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length >= 4) {
+            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "1,user1,123456,street1,sunnyvale,ca",
+                "2,user2,123456,street2,sunnyvale,ca",
+                "3,user3,123456,street3,san jose,ca",
+                "4,user4,123456,street4,san jose,ca",
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id","name","phone","street","city","state"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String[] user = sentences[index].split(",");
+            Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 1000){
+		Utils.sleep(1000);
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
new file mode 100644
index 0000000..e7e875e
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import junit.framework.Assert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+
+import org.apache.hive.hcatalog.streaming.*;
+
+public class TestHiveBolt {
+    final static String dbName = "testdb";
+    final static String tblName = "test_table";
+    final static String dbName1 = "testdb1";
+    final static String tblName1 = "test_table1";
+    final static String PART1_NAME = "city";
+    final static String PART2_NAME = "state";
+    final static String[] partNames = { PART1_NAME, PART2_NAME };
+    final String partitionVals = "sunnyvale,ca";
+    private static final String COL1 = "id";
+    private static final String COL2 = "msg";
+    final String[] colNames = {COL1,COL2};
+    final String[] colNames1 = {COL2,COL1};
+    private String[] colTypes = {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+    private final HiveConf conf;
+    private final Driver driver;
+    private final int port ;
+    final String metaStoreURI;
+    private String dbLocation;
+    private Config config = new Config();
+    private HiveBolt bolt;
+    private final static boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+    @Rule
+    public TemporaryFolder dbFolder = new TemporaryFolder();
+
+    @Mock
+    private IOutputCollector collector;
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+
+    public TestHiveBolt() throws Exception {
+        port=9083;
+        dbLocation = new String();
+        //metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true";
+        metaStoreURI = null;
+        conf = HiveSetupUtil.getHiveConf();
+        TxnDbUtil.setConfValues(conf);
+        TxnDbUtil.cleanDb();
+        TxnDbUtil.prepDb();
+        SessionState.start(new CliSessionState(conf));
+        driver = new Driver(conf);
+
+        // driver.init();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        HiveSetupUtil.dropDB(conf, dbName);
+        if(WINDOWS) {
+            dbLocation = dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+        } else {
+            dbLocation = "raw://" + dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+        }
+        HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals.split(",")),
+                colNames, colTypes, partNames, dbLocation);
+        System.out.println("done");
+    }
+
+    @Test
+    public void testEndpointConnection() throws Exception {
+        // 1) Basic
+        HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+                                              , Arrays.asList(partitionVals.split(",")));
+        StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+        connection.close();
+        // 2) Leave partition unspecified
+        endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+        endPt.newConnection(false, null).close(); // should not throw
+    }
+
+    @Test
+    public void testWithByteArrayIdandMessage()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        String city = "sunnyvale";
+        String state = "ca";
+        checkRecordCountInTable(tblName,dbName,0);
+        for (int i=0; i < 4; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkRecordCountInTable(tblName, dbName, 4);
+        bolt.cleanup();
+    }
+
+
+    @Test
+    public void testWithoutPartitions()
+        throws Exception {
+        HiveSetupUtil.dropDB(conf,dbName1);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+                                       colNames,colTypes,null, dbLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(2)
+            .withAutoCreatePartitions(false);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        String city = "sunnyvale";
+        String state = "ca";
+        checkRecordCountInTable(tblName1,dbName1,0);
+        for (int i=0; i < 4; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        bolt.cleanup();
+        checkRecordCountInTable(tblName1, dbName1, 4);
+    }
+
+    @Test
+    public void testWithTimeformat()
+        throws Exception {
+        String[] partNames1 = {"date"};
+        String timeFormat = "yyyy/MM/dd";
+        HiveSetupUtil.dropDB(conf,dbName1);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+                                       colNames,colTypes,partNames1, dbLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField(timeFormat);
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        Date d = new Date();
+        SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
+        String today=parseDate.format(d.getTime());
+        checkRecordCountInTable(tblName1,dbName1,0);
+        for (int i=0; i < 2; i++) {
+            Tuple tuple = generateTestTuple(id,msg,null,null);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkDataWritten(tblName1, dbName1, "100,test-123,"+today, "100,test-123,"+today);
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testData()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+        bolt.execute(tuple1);
+        verify(collector).ack(tuple1);
+        //bolt.execute(tuple2);
+        //verify(collector).ack(tuple2);
+        checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testJsonWriter()
+        throws Exception {
+        // json record doesn't need columns to be in the same order
+        // as table in hive.
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+        bolt.execute(tuple1);
+        verify(collector).ack(tuple1);
+        //bolt.execute(tuple2);
+        //verify(collector).ack(tuple2);
+        checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+        bolt.cleanup();
+    }
+
+
+    @Test
+    public void testMultiPartitionTuples()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(10)
+            .withBatchSize(10);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 1;
+        String msg = "test";
+        String city = "San Jose";
+        String state = "CA";
+        checkRecordCountInTable(tblName,dbName,0);
+        for(int i=0; i < 100; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkRecordCountInTable(tblName, dbName, 100);
+        bolt.cleanup();
+    }
+
+    private void checkRecordCountInTable(String tableName,String dbName,int expectedCount)
+        throws CommandNeedRetryException, IOException {
+        int count = listRecordsInTable(tableName,dbName).size();
+        Assert.assertEquals(expectedCount, count);
+    }
+
+    private  ArrayList<String> listRecordsInTable(String tableName,String dbName)
+        throws CommandNeedRetryException, IOException {
+        driver.compile("select * from " + dbName + "." + tableName);
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        return res;
+    }
+
+    private void checkDataWritten(String tableName,String dbName,String... row)
+        throws CommandNeedRetryException, IOException {
+        ArrayList<String> results = listRecordsInTable(tableName,dbName);
+        for(int i = 0; i < row.length && results.size() > 0; i++) {
+            String resultRow = results.get(i).replace("\t",",");
+            System.out.println(resultRow);
+            assertEquals(row[i],resultRow);
+        }
+    }
+
+    private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                                                                             new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, String streamId) {
+                    return new Fields("id", "msg","city","state");
+                }
+            };
+        return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
new file mode 100644
index 0000000..63b1949
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import junit.framework.Assert;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.storm.hive.bolt.HiveSetupUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.HashMap;
+
+public class TestHiveWriter {
+    final static String dbName = "testdb";
+    final static String tblName = "test_table2";
+
+    public static final String PART1_NAME = "city";
+    public static final String PART2_NAME = "state";
+    public static final String[] partNames = { PART1_NAME, PART2_NAME };
+    final String[] partitionVals = {"sunnyvale","ca"};
+    final String[] colNames = {"id","msg"};
+    private String[] colTypes = { "int", "string" };
+    private final int port;
+    private final String metaStoreURI;
+    private final HiveConf conf;
+    private ExecutorService callTimeoutPool;
+    private final Driver driver;
+    int timeout = 10000; // msec
+    UserGroupInformation ugi = null;
+
+    @Rule
+    public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+    public TestHiveWriter() throws Exception {
+        port = 9083;
+        metaStoreURI = null;
+        int callTimeoutPoolSize = 1;
+        callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
+                                                       new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+
+        // 1) Start metastore
+        conf = HiveSetupUtil.getHiveConf();
+        TxnDbUtil.setConfValues(conf);
+        TxnDbUtil.cleanDb();
+        TxnDbUtil.prepDb();
+
+        if(metaStoreURI!=null) {
+            conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+        }
+        SessionState.start(new CliSessionState(conf));
+        driver = new Driver(conf);
+        driver.init();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // 1) Setup tables
+        HiveSetupUtil.dropDB(conf, dbName);
+        String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+        HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals),
+                                       colNames,colTypes, partNames, dbLocation);
+    }
+
+    @Test
+    public void testInstantiate() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           ,callTimeoutPool, mapper, ugi);
+        writer.close();
+    }
+
+    @Test
+    public void testWriteBasic() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           , callTimeoutPool, mapper, ugi);
+        writeTuples(writer,mapper,3);
+        writer.flush(false);
+        writer.close();
+        checkRecordCountInTable(dbName,tblName,3);
+    }
+
+    @Test
+    public void testWriteMultiFlush() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           , callTimeoutPool, mapper, ugi);
+        Tuple tuple = generateTestTuple("1","abc");
+        writer.write(mapper.mapRecord(tuple));
+        checkRecordCountInTable(dbName,tblName,0);
+        writer.flush(true);
+
+        tuple = generateTestTuple("2","def");
+        writer.write(mapper.mapRecord(tuple));
+        writer.flush(true);
+
+        tuple = generateTestTuple("3","ghi");
+        writer.write(mapper.mapRecord(tuple));
+        writer.flush(true);
+        writer.close();
+        checkRecordCountInTable(dbName,tblName,3);
+    }
+
+    private Tuple generateTestTuple(Object id, Object msg) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                                                              new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, String streamId) {
+                    return new Fields("id", "msg");
+                }
+            };
+        return new TupleImpl(topologyContext, new Values(id, msg), 1, "");
+    }
+
+    private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
+            throws HiveWriter.WriteFailure, InterruptedException {
+        Integer id = 100;
+        String msg = "test-123";
+        for (int i = 1; i <= count; i++) {
+            Tuple tuple = generateTestTuple(id,msg);
+            writer.write(mapper.mapRecord(tuple));
+        }
+    }
+
+    private void checkRecordCountInTable(String dbName,String tableName,int expectedCount)
+        throws CommandNeedRetryException, IOException {
+        int count = listRecordsInTable(dbName,tableName).size();
+        Assert.assertEquals(expectedCount, count);
+    }
+
+    private  ArrayList<String> listRecordsInTable(String dbName,String tableName)
+        throws CommandNeedRetryException, IOException {
+        driver.compile("select * from " + dbName + "." + tableName);
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
new file mode 100644
index 0000000..bc607f3
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.trident;
+
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.task.TopologyContext;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+
+
+public class TridentHiveTopology {
+    public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) {
+        int batchSize = 100;
+        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+        spout.setCycle(true);
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("hiveTridentspout1",spout);
+        String[] partNames = {"city","state"};
+        String[] colNames = {"id","name","phone","street"};
+        Fields hiveFields = new Fields("id","name","phone","street","city","state");
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions;
+        if (keytab != null && principal != null) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(batchSize)
+                .withIdleTimeout(10)
+                .withCallTimeout(30000)
+                .withKerberosKeytab((String)keytab)
+                .withKerberosPrincipal((String)principal);
+        } else  {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(batchSize)
+                .withCallTimeout(30000)
+                .withIdleTimeout(10);
+        }
+        StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+        TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+        return topology.build();
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static void main(String[] args) {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if(args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+            System.out.println("waiting for 60 seconds");
+            waitForSeconds(60);
+            System.out.println("killing topology");
+            cluster.killTopology("tridenHiveTopology");
+            System.out.println("cluster shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length == 4) {
+            try {
+                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+            } catch(Exception e) {
+                System.out.println("Failed to submit topology "+e);
+            }
+        } else if (args.length == 6) {
+            try {
+                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
+            } catch(Exception e) {
+                System.out.println("Failed to submit topology "+e);
+            }
+        } else {
+            System.out.println("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
+        }
+    }
+
+    public static class FixedBatchSpout implements IBatchSpout {
+        int maxBatchSize;
+        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+        private Values[] outputs = {
+            new Values("1","user1","123456","street1","sunnyvale","ca"),
+            new Values("2","user2","123456","street2","sunnyvale","ca"),
+            new Values("3","user3","123456","street3","san jose","ca"),
+            new Values("4","user4","123456","street4","san jose","ca"),
+        };
+        private int index = 0;
+        boolean cycle = false;
+
+        public FixedBatchSpout(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        public void setCycle(boolean cycle) {
+            this.cycle = cycle;
+        }
+
+        @Override
+        public Fields getOutputFields() {
+            return new Fields("id","name","phone","street","city","state");
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context) {
+            index = 0;
+        }
+
+        @Override
+        public void emitBatch(long batchId, TridentCollector collector) {
+            List<List<Object>> batch = this.batches.get(batchId);
+            if(batch == null){
+                batch = new ArrayList<List<Object>>();
+                if(index>=outputs.length && cycle) {
+                    index = 0;
+                }
+                for(int i=0; i < maxBatchSize; index++, i++) {
+                    if(index == outputs.length){
+                        index=0;
+                    }
+                    batch.add(outputs[index]);
+                }
+                this.batches.put(batchId, batch);
+            }
+            for(List<Object> list : batch){
+                collector.emit(list);
+            }
+        }
+
+        @Override
+        public void ack(long batchId) {
+            this.batches.remove(batchId);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Map getComponentConfiguration() {
+            Config conf = new Config();
+            conf.setMaxTaskParallelism(1);
+            return conf;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5645b9a..603f772 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,6 +161,7 @@
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hbase</module>
+        <module>external/storm-hive</module>
     </modules>
 
     <scm>
@@ -212,7 +213,8 @@
         <conjure.version>2.1.3</conjure.version>
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
-
+        <hive.version>0.13.0</hive.version>
+        <hadoop.version>2.6.0</hadoop.version>
     </properties>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 5b687b6..25e064b 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -103,6 +103,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive/target</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
 
     </fileSets>


[10/13] storm git commit: Merge remote-tracking branch 'origin/master' into STORM-539-V2

Posted by sr...@apache.org.
Merge remote-tracking branch 'origin/master' into STORM-539-V2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58a34790
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58a34790
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58a34790

Branch: refs/heads/master
Commit: 58a3479032f0e7f2881df8f6fffe06eb984a8c29
Parents: 54b6a69 64d7ac6
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Feb 23 14:32:14 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Feb 23 14:32:14 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   14 +
 DEVELOPER.md                                    |   24 +-
 README.markdown                                 |    3 +
 SECURITY.md                                     |   10 +-
 STORM-UI-REST-API.md                            |    3 +-
 bin/storm-config.cmd                            |   14 +-
 bin/storm.cmd                                   |   33 +-
 conf/defaults.yaml                              |    2 +-
 docs/documentation/Trident-API-Overview.md      |    2 +-
 .../jvm/storm/starter/BasicDRPCTopology.java    |    3 +-
 external/storm-hdfs/pom.xml                     |   18 +-
 .../storm/hdfs/bolt/HdfsFileTopology.java       |    6 +-
 .../storm/hdfs/bolt/SequenceFileTopology.java   |    4 +-
 .../storm/hdfs/trident/TridentFileTopology.java |    2 +-
 .../hdfs/trident/TridentSequenceTopology.java   |    6 +-
 external/storm-jdbc/LICENSE                     |  202 +
 external/storm-jdbc/README.md                   |  240 +
 external/storm-jdbc/pom.xml                     |  125 +
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |   57 +
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |   71 +
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |   76 +
 .../org/apache/storm/jdbc/common/Column.java    |  111 +
 .../apache/storm/jdbc/common/JdbcClient.java    |  228 +
 .../java/org/apache/storm/jdbc/common/Util.java |   74 +
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |   26 +
 .../apache/storm/jdbc/mapper/JdbcMapper.java    |   33 +
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |   46 +
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |   92 +
 .../storm/jdbc/trident/state/JdbcQuery.java     |   40 +
 .../storm/jdbc/trident/state/JdbcState.java     |  145 +
 .../jdbc/trident/state/JdbcStateFactory.java    |   40 +
 .../storm/jdbc/trident/state/JdbcUpdater.java   |   32 +
 .../storm/jdbc/common/JdbcClientTest.java       |   88 +
 .../org/apache/storm/jdbc/spout/UserSpout.java  |   90 +
 .../jdbc/topology/AbstractUserTopology.java     |  106 +
 .../jdbc/topology/UserPersistanceTopology.java  |   48 +
 .../UserPersistanceTridentTopology.java         |   61 +
 external/storm-jdbc/src/test/sql/test.sql       |    1 +
 .../src/jvm/storm/kafka/PartitionManager.java   |    5 +-
 external/storm-redis/LICENSE                    |  202 +
 external/storm-redis/README.md                  |  137 +
 external/storm-redis/pom.xml                    |   65 +
 .../storm/redis/bolt/AbstractRedisBolt.java     |   67 +
 .../trident/mapper/TridentTupleMapper.java      |   27 +
 .../trident/state/RedisClusterMapState.java     |  294 +
 .../redis/trident/state/RedisClusterState.java  |   80 +
 .../trident/state/RedisClusterStateQuerier.java |   78 +
 .../trident/state/RedisClusterStateUpdater.java |   76 +
 .../redis/trident/state/RedisMapState.java      |  323 +
 .../storm/redis/trident/state/RedisState.java   |   83 +
 .../redis/trident/state/RedisStateQuerier.java  |   70 +
 .../state/RedisStateSetCountQuerier.java        |   74 +
 .../trident/state/RedisStateSetUpdater.java     |   80 +
 .../redis/trident/state/RedisStateUpdater.java  |   75 +
 .../redis/util/config/JedisClusterConfig.java   |   82 +
 .../redis/util/config/JedisPoolConfig.java      |   97 +
 .../util/container/JedisClusterContainer.java   |   47 +
 .../JedisCommandsContainerBuilder.java          |   38 +
 .../JedisCommandsInstanceContainer.java         |   25 +
 .../redis/util/container/JedisContainer.java    |   65 +
 .../storm/redis/topology/LookupWordCount.java   |  127 +
 .../redis/topology/PersistentWordCount.java     |  117 +
 .../storm/redis/topology/WordCounter.java       |   58 +
 .../apache/storm/redis/topology/WordSpout.java  |   88 +
 .../storm/redis/trident/PrintFunction.java      |   40 +
 .../redis/trident/WordCountTridentRedis.java    |   97 +
 .../trident/WordCountTridentRedisCluster.java   |  103 +
 .../WordCountTridentRedisClusterMap.java        |  101 +
 .../redis/trident/WordCountTridentRedisMap.java |   95 +
 .../redis/trident/WordCountTupleMapper.java     |   16 +
 pom.xml                                         |   11 +-
 storm-core/pom.xml                              |    7 +-
 storm-core/src/clj/backtype/storm/bootstrap.clj |   64 -
 storm-core/src/clj/backtype/storm/clojure.clj   |    2 +-
 storm-core/src/clj/backtype/storm/config.clj    |    3 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |    6 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   26 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   26 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  191 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   16 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   70 +-
 .../src/clj/backtype/storm/messaging/local.clj  |    2 +-
 storm-core/src/clj/backtype/storm/tuple.clj     |    4 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   20 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |    1 +
 storm-core/src/clj/backtype/storm/util.clj      |    3 +
 .../storm/drpc/DRPCInvocationsClient.java       |    6 +
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |   14 +-
 .../jvm/backtype/storm/drpc/ReturnResults.java  |    8 +-
 .../storm/generated/AlreadyAliveException.java  |  149 +-
 .../storm/generated/AuthorizationException.java |  149 +-
 .../src/jvm/backtype/storm/generated/Bolt.java  |  194 +-
 .../jvm/backtype/storm/generated/BoltStats.java |  908 +-
 .../storm/generated/ClusterSummary.java         |  339 +-
 .../storm/generated/ComponentCommon.java        |  426 +-
 .../storm/generated/ComponentObject.java        |   86 +-
 .../backtype/storm/generated/Credentials.java   |  220 +-
 .../storm/generated/DRPCExecutionException.java |  149 +-
 .../backtype/storm/generated/DRPCRequest.java   |  185 +-
 .../storm/generated/DistributedRPC.java         |  529 +-
 .../generated/DistributedRPCInvocations.java    | 1199 ++-
 .../jvm/backtype/storm/generated/ErrorInfo.java |  300 +-
 .../backtype/storm/generated/ExecutorInfo.java  |  198 +-
 .../storm/generated/ExecutorSpecificStats.java  |   72 +-
 .../backtype/storm/generated/ExecutorStats.java |  486 +-
 .../storm/generated/ExecutorSummary.java        |  371 +-
 .../storm/generated/GetInfoOptions.java         |  166 +-
 .../storm/generated/GlobalStreamId.java         |  185 +-
 .../jvm/backtype/storm/generated/Grouping.java  |  163 +-
 .../generated/InvalidTopologyException.java     |  149 +-
 .../backtype/storm/generated/JavaObject.java    |  239 +-
 .../backtype/storm/generated/JavaObjectArg.java |  108 +-
 .../backtype/storm/generated/KillOptions.java   |  176 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 9177 +++++++++++++-----
 .../storm/generated/NotAliveException.java      |  149 +-
 .../backtype/storm/generated/NullStruct.java    |  112 +-
 .../storm/generated/NumErrorsChoice.java        |    3 +-
 .../storm/generated/RebalanceOptions.java       |  348 +-
 .../storm/generated/ShellComponent.java         |  202 +-
 .../jvm/backtype/storm/generated/SpoutSpec.java |  194 +-
 .../backtype/storm/generated/SpoutStats.java    |  614 +-
 .../storm/generated/StateSpoutSpec.java         |  194 +-
 .../backtype/storm/generated/StormTopology.java |  410 +-
 .../backtype/storm/generated/StreamInfo.java    |  249 +-
 .../backtype/storm/generated/SubmitOptions.java |  208 +-
 .../storm/generated/SupervisorSummary.java      |  309 +-
 .../backtype/storm/generated/TopologyInfo.java  |  609 +-
 .../storm/generated/TopologyInitialStatus.java  |    3 +-
 .../storm/generated/TopologySummary.java        |  486 +-
 .../storm/messaging/ConnectionWithStatus.java   |   32 +
 .../backtype/storm/messaging/netty/Client.java  |  712 +-
 .../messaging/netty/SaslStormClientHandler.java |    5 +-
 .../backtype/storm/messaging/netty/Server.java  |  182 +-
 .../netty/StormClientPipelineFactory.java       |    5 +-
 .../security/auth/SimpleTransportPlugin.java    |    2 +-
 .../trident/drpc/ReturnResultsReducer.java      |    4 +-
 .../trident/spout/RichSpoutBatchExecutor.java   |    1 +
 storm-core/src/py/storm/DistributedRPC-remote   |   35 +-
 storm-core/src/py/storm/DistributedRPC.py       |   38 +-
 .../py/storm/DistributedRPCInvocations-remote   |   43 +-
 .../src/py/storm/DistributedRPCInvocations.py   |   95 +-
 storm-core/src/py/storm/Nimbus-remote           |  111 +-
 storm-core/src/py/storm/Nimbus.py               |  640 +-
 storm-core/src/py/storm/constants.py            |    6 +-
 storm-core/src/py/storm/ttypes.py               |  420 +-
 .../test/clj/backtype/storm/clojure_test.clj    |    9 +-
 .../test/clj/backtype/storm/drpc_test.clj       |    8 +-
 .../test/clj/backtype/storm/grouping_test.clj   |    9 +-
 .../clj/backtype/storm/integration_test.clj     |   10 +-
 .../storm/messaging/netty_integration_test.clj  |    5 +-
 .../storm/messaging/netty_unit_test.clj         |   73 +-
 .../test/clj/backtype/storm/messaging_test.clj  |    6 +-
 .../test/clj/backtype/storm/metrics_test.clj    |    8 +-
 .../test/clj/backtype/storm/multilang_test.clj  |    6 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |   21 +-
 .../scheduler/multitenant_scheduler_test.clj    |    4 +-
 .../test/clj/backtype/storm/scheduler_test.clj  |    4 +-
 .../storm/security/auth/AuthUtils_test.clj      |    6 +-
 .../backtype/storm/security/auth/auth_test.clj  |   13 +-
 .../storm/security/auth/drpc_auth_test.clj      |    7 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   10 +-
 .../clj/backtype/storm/subtopology_test.clj     |    9 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   16 +-
 .../test/clj/backtype/storm/tick_tuple_test.clj |    7 +-
 .../clj/backtype/storm/transactional_test.clj   |   14 +-
 .../test/clj/backtype/storm/worker_test.clj     |   37 +
 storm-dist/binary/src/main/assembly/binary.xml  |   10 +-
 167 files changed, 21213 insertions(+), 7411 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/58a34790/pom.xml
----------------------------------------------------------------------


[04/13] storm git commit: STORM-539. Storm hive bolt and trident state.

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
new file mode 100644
index 0000000..e9ecbd0
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopology {
+    static final String USER_SPOUT_ID = "user-spout";
+    static final String BOLT_ID = "my-hive-bolt";
+    static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String[] colNames = {"id","name","phone","street","city","state"};
+        Config config = new Config();
+        config.setNumWorkers(1);
+        UserDataSpout spout = new UserDataSpout();
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+        HiveOptions hiveOptions;
+        if (args.length == 6) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(100)
+                .withIdleTimeout(10)
+                .withKerberosKeytab(args[4])
+                .withKerberosPrincipal(args[5]);
+        } else {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(100)
+                .withIdleTimeout(10);
+        }
+
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 1)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length >= 4) {
+            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "1,user1,123456,street1,sunnyvale,ca",
+                "2,user2,123456,street2,sunnyvale,ca",
+                "3,user3,123456,street3,san jose,ca",
+                "4,user4,123456,street4,san jose,ca",
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id","name","phone","street","city","state"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String[] user = sentences[index].split(",");
+            Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 1000){
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
new file mode 100644
index 0000000..c3197c2
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopologyPartitioned {
+    static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
+    static final String BOLT_ID = "my-hive-bolt-partitioned";
+    static final String TOPOLOGY_NAME = "hive-test-topology-partitioned";
+
+    public static void main(String[] args) throws Exception {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String[] partNames = {"city","state"};
+        String[] colNames = {"id","name","phone","street"};
+        Config config = new Config();
+        config.setNumWorkers(1);
+        UserDataSpout spout = new UserDataSpout();
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions;
+        if (args.length == 6) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(1000)
+                .withIdleTimeout(10)
+                .withKerberosKeytab(args[4])
+                .withKerberosPrincipal(args[5]);
+        } else {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(1000)
+                .withIdleTimeout(10);
+        }
+
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 1)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length >= 4) {
+            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "1,user1,123456,street1,sunnyvale,ca",
+                "2,user2,123456,street2,sunnyvale,ca",
+                "3,user3,123456,street3,san jose,ca",
+                "4,user4,123456,street4,san jose,ca",
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id","name","phone","street","city","state"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String[] user = sentences[index].split(",");
+            Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 1000){
+		Utils.sleep(1000);
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
new file mode 100644
index 0000000..e7e875e
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import junit.framework.Assert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+
+import org.apache.hive.hcatalog.streaming.*;
+
+public class TestHiveBolt {
+    final static String dbName = "testdb";
+    final static String tblName = "test_table";
+    final static String dbName1 = "testdb1";
+    final static String tblName1 = "test_table1";
+    final static String PART1_NAME = "city";
+    final static String PART2_NAME = "state";
+    final static String[] partNames = { PART1_NAME, PART2_NAME };
+    final String partitionVals = "sunnyvale,ca";
+    private static final String COL1 = "id";
+    private static final String COL2 = "msg";
+    final String[] colNames = {COL1,COL2};
+    final String[] colNames1 = {COL2,COL1};
+    private String[] colTypes = {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+    private final HiveConf conf;
+    private final Driver driver;
+    private final int port ;
+    final String metaStoreURI;
+    private String dbLocation;
+    private Config config = new Config();
+    private HiveBolt bolt;
+    private final static boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+    @Rule
+    public TemporaryFolder dbFolder = new TemporaryFolder();
+
+    @Mock
+    private IOutputCollector collector;
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+
+    public TestHiveBolt() throws Exception {
+        port=9083;
+        dbLocation = new String();
+        //metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true";
+        metaStoreURI = null;
+        conf = HiveSetupUtil.getHiveConf();
+        TxnDbUtil.setConfValues(conf);
+        TxnDbUtil.cleanDb();
+        TxnDbUtil.prepDb();
+        SessionState.start(new CliSessionState(conf));
+        driver = new Driver(conf);
+
+        // driver.init();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        HiveSetupUtil.dropDB(conf, dbName);
+        if(WINDOWS) {
+            dbLocation = dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+        } else {
+            dbLocation = "raw://" + dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+        }
+        HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals.split(",")),
+                colNames, colTypes, partNames, dbLocation);
+        System.out.println("done");
+    }
+
+    @Test
+    public void testEndpointConnection() throws Exception {
+        // 1) Basic
+        HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+                                              , Arrays.asList(partitionVals.split(",")));
+        StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+        connection.close();
+        // 2) Leave partition unspecified
+        endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+        endPt.newConnection(false, null).close(); // should not throw
+    }
+
+    @Test
+    public void testWithByteArrayIdandMessage()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        String city = "sunnyvale";
+        String state = "ca";
+        checkRecordCountInTable(tblName,dbName,0);
+        for (int i=0; i < 4; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkRecordCountInTable(tblName, dbName, 4);
+        bolt.cleanup();
+    }
+
+
+    @Test
+    public void testWithoutPartitions()
+        throws Exception {
+        HiveSetupUtil.dropDB(conf,dbName1);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+                                       colNames,colTypes,null, dbLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(2)
+            .withAutoCreatePartitions(false);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        String city = "sunnyvale";
+        String state = "ca";
+        checkRecordCountInTable(tblName1,dbName1,0);
+        for (int i=0; i < 4; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        bolt.cleanup();
+        checkRecordCountInTable(tblName1, dbName1, 4);
+    }
+
+    @Test
+    public void testWithTimeformat()
+        throws Exception {
+        String[] partNames1 = {"date"};
+        String timeFormat = "yyyy/MM/dd";
+        HiveSetupUtil.dropDB(conf,dbName1);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+                                       colNames,colTypes,partNames1, dbLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField(timeFormat);
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        Date d = new Date();
+        SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
+        String today=parseDate.format(d.getTime());
+        checkRecordCountInTable(tblName1,dbName1,0);
+        for (int i=0; i < 2; i++) {
+            Tuple tuple = generateTestTuple(id,msg,null,null);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkDataWritten(tblName1, dbName1, "100,test-123,"+today, "100,test-123,"+today);
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testData()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+        bolt.execute(tuple1);
+        verify(collector).ack(tuple1);
+        //bolt.execute(tuple2);
+        //verify(collector).ack(tuple2);
+        checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testJsonWriter()
+        throws Exception {
+        // json record doesn't need columns to be in the same order
+        // as table in hive.
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+        bolt.execute(tuple1);
+        verify(collector).ack(tuple1);
+        //bolt.execute(tuple2);
+        //verify(collector).ack(tuple2);
+        checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+        bolt.cleanup();
+    }
+
+
+    @Test
+    public void testMultiPartitionTuples()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(10)
+            .withBatchSize(10);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 1;
+        String msg = "test";
+        String city = "San Jose";
+        String state = "CA";
+        checkRecordCountInTable(tblName,dbName,0);
+        for(int i=0; i < 100; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkRecordCountInTable(tblName, dbName, 100);
+        bolt.cleanup();
+    }
+
+    private void checkRecordCountInTable(String tableName,String dbName,int expectedCount)
+        throws CommandNeedRetryException, IOException {
+        int count = listRecordsInTable(tableName,dbName).size();
+        Assert.assertEquals(expectedCount, count);
+    }
+
+    private  ArrayList<String> listRecordsInTable(String tableName,String dbName)
+        throws CommandNeedRetryException, IOException {
+        driver.compile("select * from " + dbName + "." + tableName);
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        return res;
+    }
+
+    private void checkDataWritten(String tableName,String dbName,String... row)
+        throws CommandNeedRetryException, IOException {
+        ArrayList<String> results = listRecordsInTable(tableName,dbName);
+        for(int i = 0; i < row.length && results.size() > 0; i++) {
+            String resultRow = results.get(i).replace("\t",",");
+            System.out.println(resultRow);
+            assertEquals(row[i],resultRow);
+        }
+    }
+
+    private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                                                                             new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, String streamId) {
+                    return new Fields("id", "msg","city","state");
+                }
+            };
+        return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
new file mode 100644
index 0000000..63b1949
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import junit.framework.Assert;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.storm.hive.bolt.HiveSetupUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.HashMap;
+
+public class TestHiveWriter {
+    final static String dbName = "testdb";
+    final static String tblName = "test_table2";
+
+    public static final String PART1_NAME = "city";
+    public static final String PART2_NAME = "state";
+    public static final String[] partNames = { PART1_NAME, PART2_NAME };
+    final String[] partitionVals = {"sunnyvale","ca"};
+    final String[] colNames = {"id","msg"};
+    private String[] colTypes = { "int", "string" };
+    private final int port;
+    private final String metaStoreURI;
+    private final HiveConf conf;
+    private ExecutorService callTimeoutPool;
+    private final Driver driver;
+    int timeout = 10000; // msec
+    UserGroupInformation ugi = null;
+
+    @Rule
+    public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+    public TestHiveWriter() throws Exception {
+        port = 9083;
+        metaStoreURI = null;
+        int callTimeoutPoolSize = 1;
+        callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
+                                                       new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+
+        // 1) Start metastore
+        conf = HiveSetupUtil.getHiveConf();
+        TxnDbUtil.setConfValues(conf);
+        TxnDbUtil.cleanDb();
+        TxnDbUtil.prepDb();
+
+        if(metaStoreURI!=null) {
+            conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+        }
+        SessionState.start(new CliSessionState(conf));
+        driver = new Driver(conf);
+        driver.init();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // 1) Setup tables
+        HiveSetupUtil.dropDB(conf, dbName);
+        String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+        HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals),
+                                       colNames,colTypes, partNames, dbLocation);
+    }
+
+    @Test
+    public void testInstantiate() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           ,callTimeoutPool, mapper, ugi);
+        writer.close();
+    }
+
+    @Test
+    public void testWriteBasic() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           , callTimeoutPool, mapper, ugi);
+        writeTuples(writer,mapper,3);
+        writer.flush(false);
+        writer.close();
+        checkRecordCountInTable(dbName,tblName,3);
+    }
+
+    @Test
+    public void testWriteMultiFlush() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           , callTimeoutPool, mapper, ugi);
+        Tuple tuple = generateTestTuple("1","abc");
+        writer.write(mapper.mapRecord(tuple));
+        checkRecordCountInTable(dbName,tblName,0);
+        writer.flush(true);
+
+        tuple = generateTestTuple("2","def");
+        writer.write(mapper.mapRecord(tuple));
+        writer.flush(true);
+
+        tuple = generateTestTuple("3","ghi");
+        writer.write(mapper.mapRecord(tuple));
+        writer.flush(true);
+        writer.close();
+        checkRecordCountInTable(dbName,tblName,3);
+    }
+
+    private Tuple generateTestTuple(Object id, Object msg) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                                                              new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, String streamId) {
+                    return new Fields("id", "msg");
+                }
+            };
+        return new TupleImpl(topologyContext, new Values(id, msg), 1, "");
+    }
+
+    private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
+            throws HiveWriter.WriteFailure, InterruptedException {
+        Integer id = 100;
+        String msg = "test-123";
+        for (int i = 1; i <= count; i++) {
+            Tuple tuple = generateTestTuple(id,msg);
+            writer.write(mapper.mapRecord(tuple));
+        }
+    }
+
+    private void checkRecordCountInTable(String dbName,String tableName,int expectedCount)
+        throws CommandNeedRetryException, IOException {
+        int count = listRecordsInTable(dbName,tableName).size();
+        Assert.assertEquals(expectedCount, count);
+    }
+
+    private  ArrayList<String> listRecordsInTable(String dbName,String tableName)
+        throws CommandNeedRetryException, IOException {
+        driver.compile("select * from " + dbName + "." + tableName);
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
new file mode 100644
index 0000000..bc607f3
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.trident;
+
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.task.TopologyContext;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+
+
+public class TridentHiveTopology {
+    public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) {
+        int batchSize = 100;
+        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+        spout.setCycle(true);
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("hiveTridentspout1",spout);
+        String[] partNames = {"city","state"};
+        String[] colNames = {"id","name","phone","street"};
+        Fields hiveFields = new Fields("id","name","phone","street","city","state");
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions;
+        if (keytab != null && principal != null) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(batchSize)
+                .withIdleTimeout(10)
+                .withCallTimeout(30000)
+                .withKerberosKeytab((String)keytab)
+                .withKerberosPrincipal((String)principal);
+        } else  {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(batchSize)
+                .withCallTimeout(30000)
+                .withIdleTimeout(10);
+        }
+        StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+        TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+        return topology.build();
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static void main(String[] args) {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if(args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+            System.out.println("waiting for 60 seconds");
+            waitForSeconds(60);
+            System.out.println("killing topology");
+            cluster.killTopology("tridenHiveTopology");
+            System.out.println("cluster shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length == 4) {
+            try {
+                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+            } catch(Exception e) {
+                System.out.println("Failed to submit topology "+e);
+            }
+        } else if (args.length == 6) {
+            try {
+                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
+            } catch(Exception e) {
+                System.out.println("Failed to submit topology "+e);
+            }
+        } else {
+            System.out.println("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
+        }
+    }
+
+    public static class FixedBatchSpout implements IBatchSpout {
+        int maxBatchSize;
+        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+        private Values[] outputs = {
+            new Values("1","user1","123456","street1","sunnyvale","ca"),
+            new Values("2","user2","123456","street2","sunnyvale","ca"),
+            new Values("3","user3","123456","street3","san jose","ca"),
+            new Values("4","user4","123456","street4","san jose","ca"),
+        };
+        private int index = 0;
+        boolean cycle = false;
+
+        public FixedBatchSpout(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        public void setCycle(boolean cycle) {
+            this.cycle = cycle;
+        }
+
+        @Override
+        public Fields getOutputFields() {
+            return new Fields("id","name","phone","street","city","state");
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context) {
+            index = 0;
+        }
+
+        @Override
+        public void emitBatch(long batchId, TridentCollector collector) {
+            List<List<Object>> batch = this.batches.get(batchId);
+            if(batch == null){
+                batch = new ArrayList<List<Object>>();
+                if(index>=outputs.length && cycle) {
+                    index = 0;
+                }
+                for(int i=0; i < maxBatchSize; index++, i++) {
+                    if(index == outputs.length){
+                        index=0;
+                    }
+                    batch.add(outputs[index]);
+                }
+                this.batches.put(batchId, batch);
+            }
+            for(List<Object> list : batch){
+                collector.emit(list);
+            }
+        }
+
+        @Override
+        public void ack(long batchId) {
+            this.batches.remove(batchId);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Map getComponentConfiguration() {
+            Config conf = new Config();
+            conf.setMaxTaskParallelism(1);
+            return conf;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af86bd0..6265d9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hbase</module>
+        <module>external/storm-hive</module>
     </modules>
 
     <scm>
@@ -212,7 +213,8 @@
         <zookeeper.version>3.4.6</zookeeper.version>
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
-
+        <hive.version>0.13.0</hive.version>
+        <hadoop.version>2.6.0</hadoop.version>
     </properties>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index ebff853..51cfd14 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -103,6 +103,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive/target</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
 
     </fileSets>


[05/13] storm git commit: STORM-539. Storm hive bolt and trident state.

Posted by sr...@apache.org.
STORM-539. Storm hive bolt and trident state.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81772b22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81772b22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81772b22

Branch: refs/heads/master
Commit: 81772b22668d51852b65bc8bbe0d83116c07e383
Parents: 8036109
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Dec 15 14:24:51 2014 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 12 08:53:43 2015 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +-
 external/storm-hive/README.md                   | 111 +++++
 external/storm-hive/pom.xml                     | 143 +++++++
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 289 +++++++++++++
 .../bolt/mapper/DelimitedRecordHiveMapper.java  | 143 +++++++
 .../storm/hive/bolt/mapper/HiveMapper.java      |  81 ++++
 .../hive/bolt/mapper/JsonRecordHiveMapper.java  | 132 ++++++
 .../apache/storm/hive/common/HiveOptions.java   | 146 +++++++
 .../org/apache/storm/hive/common/HiveUtils.java |  76 ++++
 .../apache/storm/hive/common/HiveWriter.java    | 420 +++++++++++++++++++
 .../apache/storm/hive/trident/HiveState.java    | 306 ++++++++++++++
 .../storm/hive/trident/HiveStateFactory.java    |  31 ++
 .../apache/storm/hive/trident/HiveUpdater.java  |  14 +
 .../apache/storm/hive/bolt/HiveSetupUtil.java   | 220 ++++++++++
 .../apache/storm/hive/bolt/HiveTopology.java    | 150 +++++++
 .../hive/bolt/HiveTopologyPartitioned.java      | 153 +++++++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 330 +++++++++++++++
 .../storm/hive/common/TestHiveWriter.java       | 193 +++++++++
 .../storm/hive/trident/TridentHiveTopology.java | 190 +++++++++
 pom.xml                                         |   4 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 21 files changed, 3148 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ef159e1..df109b3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,4 +31,6 @@ target
 .*
 !/.gitignore
 _site
-storm-core/dependency-reduced-pom.xml
+dependency-reduced-pom.xml
+derby.log
+metastore_db

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
new file mode 100644
index 0000000..6461462
--- /dev/null
+++ b/external/storm-hive/README.md
@@ -0,0 +1,111 @@
+# Storm Hive Bolt & Trident State
+
+  Hive offers streaming API that allows data to be written continuously into Hive. The incoming data 
+  can be continuously committed in small batches of records into existing Hive partition or table. Once the data
+  is committed its immediately visible to all hive queries. More info on Hive Streaming API 
+  https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
+  
+  With the help of Hive Streaming API , HiveBolt and HiveState allows users to stream data from storm into hive directly.
+  To use Hive streaming API users need to create a bucketed table with ORC format.  Example below
+  
+  ```code
+  create table test_table ( id INT, name STRING, phone STRING, street STRING) partitioned by (city STRING, state STRING) stored as orc tblproperties ("orc.compress"="NONE");
+  ```
+  
+
+## HiveBolt
+
+HiveBolt streams tuples directly into hive. Tuples are written using Hive Transactions. 
+Partiions to which HiveBolt will stream to can either created or pre-created or optionally
+HiveBolt  can create them if they are missing. Fields from Tuples are mapped to table columns.
+User should make sure that Tuple filed names are matched to the table column names.
+
+```java
+DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper);
+HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+```
+
+### RecordHiveMapper
+   This class maps Tuple filed names to Hive table column names.
+   There are two implementaitons available
+ 
+   
+   1) DelimitedRecordHiveMapper
+   2) JsonRecordHiveMapper
+   
+   ```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+    or
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+   ```
+
+|Arg | Description | Type
+|--- |--- |---
+|withColumnFields| field names in a tuple to be mapped to table column names | Fileds (required) |
+|withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
+|withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
+
+### HiveOptions
+  
+HiveBolt takes in HiveOptions as a constructor arg.
+
+  ```java
+  HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+  ```
+
+
+HiveOptions params
+
+|Arg  |Description | Type
+|---	|--- |---
+|metaStoreURI | hive meta store URI (can be found in hive-site.xml) | String (required) |
+|dbName | database name | String (required) |
+|tblName | table name | String (required) |
+|mapper| Mapper class to map Tuple field names to Table column names | DelimitedRecordHiveMapper or JsonRecordHiveMapper (required) |
+|withTxnsPerBatch | Hive grants a *batch of transactions* instead of single transactions to streaming clients like HiveBolt.This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.| Integer . default 100 |
+|withMaxOpenConnections| Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.| Integer . default 100|
+|withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
+|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
+|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. defalut true |
+|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
+|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+
+
+ 
+## HiveState
+
+Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
+
+```code
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+            
+   HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+                	     		
+   StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+   TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+ ```
+   
+ 
+ 
+
+
+
+
+
+
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
new file mode 100644
index 0000000..adf7567
--- /dev/null
+++ b/external/storm-hive/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>storm</artifactId>
+    <groupId>org.apache.storm</groupId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <packaging>jar</packaging>
+  <artifactId>storm-hive</artifactId>
+  <name>storm-hive</name>
+  <developers>
+    <developer>
+      <id>harshach</id>
+      <name>Sriharsha Chintalapani</name>
+      <email>mail@harsha.io</email>
+    </developer>
+  </developers>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-streaming</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+	    <groupId>org.apache.calcite</groupId>
+	    <artifactId>calcite-core</artifactId>
+      <version>0.9.2-incubating</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.9.0</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
new file mode 100644
index 0000000..849697d
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.hive.common.HiveWriter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+public class HiveBolt extends  BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+    private OutputCollector collector;
+    private HiveOptions options;
+    private Integer currentBatchSize;
+    private ExecutorService callTimeoutPool;
+    private transient Timer heartBeatTimer;
+    private Boolean kerberosEnabled = false;
+    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private UserGroupInformation ugi = null;
+    HashMap<HiveEndPoint, HiveWriter> allWriters;
+
+    public HiveBolt(HiveOptions options) {
+        this.options = options;
+        this.currentBatchSize = 0;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector)  {
+        try {
+            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) {
+                kerberosEnabled = false;
+            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) {
+                kerberosEnabled = true;
+            } else {
+                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " +
+                                                   " & KerberosKeytab");
+            }
+
+            if (kerberosEnabled) {
+                try {
+                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
+                } catch(HiveUtils.AuthenticationFailed ex) {
+                    LOG.error("Hive Kerberos authentication failed " + ex.getMessage(), ex);
+                    throw new IllegalArgumentException(ex);
+                }
+            }
+            this.collector = collector;
+            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            String timeoutName = "hive-bolt-%d";
+            this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            heartBeatTimer = new Timer();
+            setupHeartBeatTimer();
+        } catch(Exception e) {
+            LOG.warn("unable to make connection to hive ",e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+            HiveWriter writer = getOrCreateWriter(endPoint);
+            if(timeToSendHeartBeat.compareAndSet(true, false)) {
+                enableHeartBeatOnAllWriters();
+            }
+            writer.write(options.getMapper().mapRecord(tuple));
+            currentBatchSize++;
+            if(currentBatchSize >= options.getBatchSize()) {
+                flushAllWriters();
+                currentBatchSize = 0;
+            }
+            collector.ack(tuple);
+        } catch(Exception e) {
+            collector.fail(tuple);
+            flushAndCloseWriters();
+            LOG.warn("hive streaming failed. ",e);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                LOG.info("Flushing writer to {}", w);
+                w.flush(false);
+                LOG.info("Closing writer to {}", w);
+                w.close();
+            } catch (Exception ex) {
+                LOG.warn("Error while closing writer to " + entry.getKey() +
+                         ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        ExecutorService toShutdown[] = {callTimeoutPool};
+        for (ExecutorService execService : toShutdown) {
+            execService.shutdown();
+            try {
+                while (!execService.isTerminated()) {
+                    execService.awaitTermination(
+                                 options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException ex) {
+                LOG.warn("shutdown interrupted on " + execService, ex);
+            }
+        }
+        callTimeoutPool = null;
+        super.cleanup();
+        LOG.info("Hive Bolt stopped");
+    }
+
+
+    private void setupHeartBeatTimer() {
+        if(options.getHeartBeatInterval()>0) {
+            heartBeatTimer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        timeToSendHeartBeat.set(true);
+                        setupHeartBeatTimer();
+                    }
+                }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void flushAllWriters()
+        throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for(HiveWriter writer: allWriters.values()) {
+            writer.flush(true);
+        }
+    }
+
+    /**
+     * Closes all writers and remove them from cache
+     * @return number of writers retired
+     */
+    private void closeAllWriters() {
+        try {
+            //1) Retire writers
+            for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+                entry.getValue().close();
+            }
+            //2) Clear cache
+            allWriters.clear();
+        } catch(Exception e) {
+            LOG.warn("unable to close writers. ", e);
+        }
+    }
+
+    private void flushAndCloseWriters() {
+        try {
+            flushAllWriters();
+        } catch(Exception e) {
+            LOG.warn("unable to flush hive writers. ", e);
+        } finally {
+            closeAllWriters();
+        }
+    }
+
+    private void enableHeartBeatOnAllWriters() {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.setHeartBeatNeeded();
+        }
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        try {
+            HiveWriter writer = allWriters.get( endPoint );
+            if( writer == null ) {
+                LOG.debug("Creating Writer to Hive end point : " + endPoint);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if(allWriters.size() > options.getMaxOpenConnections()){
+                    int retired = retireIdleWriters();
+                    if(retired==0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+    }
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            LOG.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
+
+        //1) Find retirement candidates
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for(HiveEndPoint ep : retirees) {
+            try {
+                LOG.info("Closing idle Writer to Hive end point : {}", ep);
+                allWriters.remove(ep).close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
new file mode 100644
index 0000000..d516795
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.IOException;
+
+public class DelimitedRecordHiveMapper implements HiveMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private Fields columnFields;
+    private Fields partitionFields;
+    private String[] columnNames;
+    private String timeFormat;
+    private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+    private SimpleDateFormat parseDate;
+
+    public DelimitedRecordHiveMapper() {
+    }
+
+    public DelimitedRecordHiveMapper withColumnFields(Fields columnFields) {
+        this.columnFields = columnFields;
+        List<String> tempColumnNamesList = this.columnFields.toList();
+        columnNames = new String[tempColumnNamesList.size()];
+        tempColumnNamesList.toArray(columnNames);
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withPartitionFields(Fields partitionFields) {
+        this.partitionFields = partitionFields;
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter){
+        this.fieldDelimiter = delimiter;
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
+        this.timeFormat = timeFormat;
+        parseDate = new SimpleDateFormat(timeFormat);
+        return this;
+    }
+
+    @Override
+    public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException {
+        return new DelimitedInputWriter(columnNames, fieldDelimiter,endPoint);
+    }
+
+    @Override
+    public void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException {
+        txnBatch.write(mapRecord(tuple));
+    }
+
+    @Override
+    public List<String> mapPartitions(Tuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(Tuple tuple) {
+        StringBuilder builder = new StringBuilder();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                builder.append(tuple.getValueByField(field));
+                builder.append(fieldDelimiter);
+            }
+        }
+        return builder.toString().getBytes();
+    }
+
+    @Override
+    public List<String> mapPartitions(TridentTuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(TridentTuple tuple) {
+        StringBuilder builder = new StringBuilder();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                builder.append(tuple.getValueByField(field));
+                builder.append(fieldDelimiter);
+            }
+        }
+        return builder.toString().getBytes();
+    }
+
+    private String getPartitionsByTimeFormat() {
+        Date d = new Date();
+        return parseDate.format(d.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
new file mode 100644
index 0000000..a3b5531
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+import java.util.List;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import java.io.Serializable;
+
+import java.io.IOException;
+
+/**
+ * Maps a <code>backtype.storm.tuple.Tupe</code> object
+ * to a row in an Hive table.
+ */
+public interface HiveMapper extends Serializable {
+
+    /**
+     * Given a endPoint, returns a RecordWriter with columnNames.
+     *
+     * @param tuple
+     * @return
+     */
+
+    RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException;
+
+    void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException;
+
+    /**
+     * Given a tuple, return a hive partition values list.
+     *
+     * @param tuple
+     * @return List<String>
+     */
+    List<String> mapPartitions(Tuple tuple);
+
+    /**
+     * Given a tuple, maps to a HiveRecord based on columnFields
+     * @Param Tuple
+     * @return byte[]
+     */
+    byte[] mapRecord(Tuple tuple);
+
+    /**
+     * Given a TridetnTuple, return a hive partition values list.
+     *
+     * @param TridentTuple
+     * @return List<String>
+     */
+    List<String> mapPartitions(TridentTuple tuple);
+
+    /**
+     * Given a TridentTuple, maps to a HiveRecord based on columnFields
+     * @Param TridentTuple
+     * @return byte[]
+     */
+    byte[] mapRecord(TridentTuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
new file mode 100644
index 0000000..ce3e475
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Fields;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.IOException;
+
+public class JsonRecordHiveMapper implements HiveMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
+    private Fields columnFields;
+    private Fields partitionFields;
+    private String timeFormat;
+    private SimpleDateFormat parseDate;
+
+    public JsonRecordHiveMapper() {
+    }
+
+    public JsonRecordHiveMapper withColumnFields(Fields columnFields) {
+        this.columnFields = columnFields;
+        return this;
+    }
+
+    public JsonRecordHiveMapper withPartitionFields(Fields partitionFields) {
+        this.partitionFields = partitionFields;
+        return this;
+    }
+
+    public JsonRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
+        this.timeFormat = timeFormat;
+        parseDate = new SimpleDateFormat(timeFormat);
+        return this;
+    }
+
+    @Override
+    public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException {
+        return new StrictJsonWriter(endPoint);
+    }
+
+    @Override
+    public void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException {
+        txnBatch.write(mapRecord(tuple));
+    }
+
+    @Override
+    public List<String> mapPartitions(Tuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(Tuple tuple) {
+        JSONObject obj = new JSONObject();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                obj.put(field,tuple.getValueByField(field));
+            }
+        }
+        return obj.toJSONString().getBytes();
+    }
+
+    @Override
+    public List<String> mapPartitions(TridentTuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(TridentTuple tuple) {
+        JSONObject obj = new JSONObject();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                obj.put(field,tuple.getValueByField(field));
+            }
+        }
+        return obj.toJSONString().getBytes();
+    }
+
+    private String getPartitionsByTimeFormat() {
+        Date d = new Date();
+        return parseDate.format(d.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
new file mode 100644
index 0000000..d316294
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import java.io.Serializable;
+
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.hive.hcatalog.streaming.*;
+
+
+public class HiveOptions implements Serializable {
+    protected HiveMapper mapper;
+    protected String databaseName;
+    protected String tableName;
+    protected String metaStoreURI;
+    protected Integer txnsPerBatch = 100;
+    protected Integer maxOpenConnections = 500;
+    protected Integer batchSize = 15000;
+    protected Integer idleTimeout = 0;
+    protected Integer callTimeout = 10000;
+    protected Integer heartBeatInterval = 240;
+    protected Boolean autoCreatePartitions = true;
+    protected String kerberosPrincipal;
+    protected String kerberosKeytab;
+
+    public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) {
+        this.metaStoreURI = metaStoreURI;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.mapper = mapper;
+    }
+
+    public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
+        this.txnsPerBatch = txnsPerBatch;
+        return this;
+    }
+
+    public HiveOptions withMaxOpenConnections(Integer maxOpenConnections) {
+        this.maxOpenConnections = maxOpenConnections;
+        return this;
+    }
+
+    public HiveOptions withBatchSize(Integer batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public HiveOptions withIdleTimeout(Integer idleTimeout) {
+        this.idleTimeout = idleTimeout;
+        return this;
+    }
+
+    public HiveOptions withCallTimeout(Integer callTimeout) {
+        this.callTimeout = callTimeout;
+        return this;
+    }
+
+    public HiveOptions withHeartBeatInterval(Integer heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+        return this;
+    }
+
+    public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
+        this.autoCreatePartitions = autoCreatePartitions;
+        return this;
+    }
+
+    public HiveOptions withKerberosKeytab(String kerberosKeytab) {
+        this.kerberosKeytab = kerberosKeytab;
+        return this;
+    }
+
+    public HiveOptions withKerberosPrincipal(String kerberosPrincipal) {
+        this.kerberosPrincipal = kerberosPrincipal;
+        return this;
+    }
+
+    public String getMetaStoreURI() {
+        return metaStoreURI;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public HiveMapper getMapper() {
+        return mapper;
+    }
+
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public Integer getCallTimeOut() {
+        return callTimeout;
+    }
+
+    public Integer getHeartBeatInterval() {
+        return heartBeatInterval;
+    }
+
+    public Integer getMaxOpenConnections() {
+        return maxOpenConnections;
+    }
+
+    public Integer getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public Integer getTxnsPerBatch() {
+        return txnsPerBatch;
+    }
+
+    public Boolean getAutoCreatePartitions() {
+        return autoCreatePartitions;
+    }
+
+    public String getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public String getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
new file mode 100644
index 0000000..5483b07
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.hive.hcatalog.streaming.*;
+
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.io.File;
+import java.io.IOException;
+
+public class HiveUtils {
+
+    public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
+        if(partitionVals==null) {
+            return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
+        }
+        return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
+    }
+
+    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
+                              options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi);
+    }
+
+    public static synchronized UserGroupInformation authenticate(String keytab, String principal)
+    throws AuthenticationFailed {
+        File kfile = new File(keytab);
+        if (!(kfile.isFile() && kfile.canRead())) {
+            throw new IllegalArgumentException("The keyTab file: "
+                                               + keytab + " is nonexistent or can't read. "
+                                               + "Please specify a readable keytab file for Kerberos auth.");
+        }
+        try {
+            principal = SecurityUtil.getServerPrincipal(principal, "");
+        } catch (Exception e) {
+            throw new AuthenticationFailed("Host lookup error when resolving principal " + principal, e);
+        }
+        try {
+            UserGroupInformation.loginUserFromKeytab(principal, keytab);
+            return UserGroupInformation.getLoginUser();
+        } catch (IOException e) {
+            throw new AuthenticationFailed("Login failed for principal " + principal, e);
+        }
+    }
+
+     public static class AuthenticationFailed extends Exception {
+         public AuthenticationFailed(String reason, Exception cause) {
+             super("Kerberos Authentication Failed. " + reason, cause);
+         }
+     }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
new file mode 100644
index 0000000..726b8e8
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import backtype.storm.tuple.Tuple;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HiveWriter {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(HiveWriter.class);
+
+    private final HiveEndPoint endPoint;
+    private final StreamingConnection connection;
+    private final int txnsPerBatch;
+    private final RecordWriter recordWriter;
+    private TransactionBatch txnBatch;
+    private final ExecutorService callTimeoutPool;
+    private final long callTimeout;
+
+    private long lastUsed; // time of last flush on this writer
+    protected boolean closed; // flag indicating HiveWriter was closed
+    private boolean autoCreatePartitions;
+    private boolean heartBeatNeeded = false;
+    private UserGroupInformation ugi;
+
+    public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
+                      boolean autoCreatePartitions, long callTimeout,
+                      ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi)
+        throws InterruptedException, ConnectFailure {
+        try {
+            this.autoCreatePartitions = autoCreatePartitions;
+            this.callTimeout = callTimeout;
+            this.callTimeoutPool = callTimeoutPool;
+            this.endPoint = endPoint;
+            this.ugi = ugi;
+            this.connection = newConnection(ugi);
+            this.txnsPerBatch = txnsPerBatch;
+            this.recordWriter = mapper.createRecordWriter(endPoint);
+            this.txnBatch = nextTxnBatch(recordWriter);
+            this.closed = false;
+            this.lastUsed = System.currentTimeMillis();
+        } catch(InterruptedException e) {
+            throw e;
+        } catch(RuntimeException e) {
+            throw e;
+        } catch(Exception e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return endPoint.toString();
+    }
+
+    public void setHeartBeatNeeded() {
+        heartBeatNeeded = true;
+    }
+
+    /**
+     * Write data <br />
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public synchronized void write(final byte[] record)
+        throws WriteFailure, InterruptedException {
+        if (closed) {
+            throw new IllegalStateException("This hive streaming writer was closed " +
+                                            "and thus no longer able to write : " + endPoint);
+        }
+        // write the tuple
+        try {
+            LOG.debug("Writing event to {}", endPoint);
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws StreamingException, InterruptedException {
+                        txnBatch.write(record);
+                        return null;
+                    }
+                });
+        } catch(StreamingException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        } catch(TimeoutException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    /**
+     * Commits the current Txn.
+     * If 'rollToNext' is true, will switch to next Txn in batch or to a
+     *       new TxnBatch if current Txn batch is exhausted
+     * TODO: see what to do when there are errors in each IO call stage
+     */
+    public void flush(boolean rollToNext)
+        throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
+        if(heartBeatNeeded) {
+            heartBeatNeeded = false;
+            heartBeat();
+        }
+        lastUsed = System.currentTimeMillis();
+        try {
+            commitTxn();
+            if(txnBatch.remainingTransactions() == 0) {
+                closeTxnBatch();
+                txnBatch = null;
+                if(rollToNext) {
+                    txnBatch = nextTxnBatch(recordWriter);
+                }
+            }
+            if(rollToNext) {
+                LOG.debug("Switching to next Txn for {}", endPoint);
+                txnBatch.beginNextTransaction(); // does not block
+            }
+        } catch(StreamingException e) {
+            throw new TxnFailure(txnBatch, e);
+        }
+    }
+
+    /** Queues up a heartbeat request on the current and remaining txns using the
+     *  heartbeatThdPool and returns immediately
+     */
+    public void heartBeat() throws InterruptedException {
+        // 1) schedule the heartbeat on one thread in pool
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws Exception {
+                        try {
+                            LOG.debug("Sending heartbeat on batch " + txnBatch);
+                            txnBatch.heartbeat();
+                        } catch (StreamingException e) {
+                            LOG.warn("Heartbeat error on batch " + txnBatch, e);
+                        }
+                        return null;
+                    }
+                });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+            // Suppressing exceptions as we don't care for errors on heartbeats
+        }
+    }
+
+    /**
+     * Close the Transaction Batch and connection
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void close() throws IOException, InterruptedException {
+        closeTxnBatch();
+        closeConnection();
+        closed = true;
+    }
+
+    private void closeConnection() throws InterruptedException {
+        LOG.info("Closing connection to end point : {}", endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        connection.close(); // could block
+                        return null;
+                    }
+                });
+        } catch(Exception e) {
+            LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on connection close
+        }
+    }
+
+    private void commitTxn() throws CommitFailure, InterruptedException {
+        LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId() , endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        txnBatch.commit(); // could block
+                        return null;
+                    }
+                });
+        } catch (StreamingException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        } catch (TimeoutException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    private StreamingConnection newConnection(final UserGroupInformation ugi)
+        throws InterruptedException, ConnectFailure {
+        try {
+            return  callWithTimeout(new CallRunner<StreamingConnection>() {
+                    @Override
+                    public StreamingConnection call() throws Exception {
+                        return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
+                    }
+                });
+        } catch(StreamingException e) {
+            throw new ConnectFailure(endPoint, e);
+        } catch(TimeoutException e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
+        throws InterruptedException, TxnBatchFailure {
+        LOG.debug("Fetching new Txn Batch for {}", endPoint);
+        TransactionBatch batch = null;
+        try {
+            batch = callWithTimeout(new CallRunner<TransactionBatch>() {
+                @Override
+                public TransactionBatch call() throws Exception {
+                    return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
+                }
+            });
+        LOG.debug("Acquired {}. Switching to first txn", batch);
+        batch.beginNextTransaction();
+        } catch(TimeoutException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        } catch(StreamingException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        }
+        return batch;
+    }
+
+    private void closeTxnBatch() throws  InterruptedException {
+        try {
+            LOG.debug("Closing Txn Batch {}", txnBatch);
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws Exception {
+                        if(txnBatch != null) {
+                            txnBatch.close(); // could block
+                        }
+                        return null;
+                    }
+                });
+        } catch(InterruptedException e) {
+            throw e;
+        } catch(Exception e) {
+            LOG.warn("Error closing txn batch "+ txnBatch, e);
+        }
+    }
+
+    /**
+     * Aborts the current Txn and switches to next Txn.
+     * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+     */
+    public void abort() throws InterruptedException {
+        abortTxn();
+    }
+
+    private void abortTxn() throws InterruptedException {
+        LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws StreamingException, InterruptedException {
+                        txnBatch.abort(); // could block
+                        return null;
+                    }
+                });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (TimeoutException e) {
+            LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+        } catch (Exception e) {
+            LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on abort
+        }
+    }
+
+
+    /**
+     * If the current thread has been interrupted, then throws an
+     * exception.
+     * @throws InterruptedException
+     */
+    private static void checkAndThrowInterruptedException()
+        throws InterruptedException {
+        if (Thread.currentThread().interrupted()) {
+            throw new InterruptedException("Timed out before Hive call was made. "
+                                           + "Your callTimeout might be set too low or Hive calls are "
+                                           + "taking too long.");
+        }
+    }
+
+    /**
+     * Execute the callable on a separate thread and wait for the completion
+     * for the specified amount of time in milliseconds. In case of timeout
+     * cancel the callable and throw an IOException
+     */
+    private <T> T callWithTimeout(final CallRunner<T> callRunner)
+        throws TimeoutException, StreamingException, InterruptedException {
+        Future<T> future = callTimeoutPool.submit(new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    return callRunner.call();
+                }
+            });
+        try {
+            if (callTimeout > 0) {
+                return future.get(callTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                return future.get();
+            }
+        } catch (TimeoutException eT) {
+            future.cancel(true);
+            throw eT;
+        } catch (ExecutionException e1) {
+            Throwable cause = e1.getCause();
+            if (cause instanceof IOException) {
+                throw new StreamingIOFailure("I/O Failure", (IOException) cause);
+            } else if (cause instanceof StreamingException) {
+                throw (StreamingException) cause;
+            } else if (cause instanceof InterruptedException) {
+                throw (InterruptedException) cause;
+            } else if (cause instanceof RuntimeException) {
+                throw (RuntimeException) cause;
+            } else if (cause instanceof TimeoutException) {
+                throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
+            } else {
+                throw new RuntimeException(e1);
+            }
+        }
+    }
+
+    public long getLastUsed() {
+        return lastUsed;
+    }
+
+    private byte[] generateRecord(Tuple tuple) {
+        StringBuilder buf = new StringBuilder();
+        for (Object o: tuple.getValues()) {
+            buf.append(o);
+            buf.append(",");
+        }
+        return buf.toString().getBytes();
+    }
+
+    /**
+     * Simple interface whose <tt>call</tt> method is called by
+     * {#callWithTimeout} in a new thread inside a
+     * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+     * @param <T>
+     */
+    private interface CallRunner<T> {
+        T call() throws Exception;
+    }
+
+    public static class Failure extends Exception {
+        public Failure(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    public static class WriteFailure extends Failure {
+        public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
+            super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
+        }
+    }
+
+    public static class CommitFailure extends Failure {
+        public CommitFailure(HiveEndPoint endPoint, Long txnID, Throwable cause) {
+            super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
+        }
+    }
+
+    public static class ConnectFailure extends Failure {
+        public ConnectFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed connecting to EndPoint " + ep, cause);
+        }
+    }
+
+    public static class TxnBatchFailure extends Failure {
+        public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
+        }
+    }
+
+    public static class TxnFailure extends Failure {
+        public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
+            super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
new file mode 100644
index 0000000..6050aa8
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HiveState implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveState.class);
+    private HiveOptions options;
+    private Integer currentBatchSize;
+    private ExecutorService callTimeoutPool;
+    private transient Timer heartBeatTimer;
+    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private UserGroupInformation ugi = null;
+    private Boolean kerberosEnabled = false;
+    HashMap<HiveEndPoint, HiveWriter> allWriters;
+
+    public HiveState(HiveOptions options) {
+        this.options = options;
+        this.currentBatchSize = 0;
+    }
+
+
+    @Override
+    public void beginCommit(Long txId) {
+    }
+
+    @Override
+    public void commit(Long txId) {
+    }
+
+    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)  {
+        try {
+            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) {
+                kerberosEnabled = false;
+            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) {
+                kerberosEnabled = true;
+            } else {
+                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " +
+                                                   " & KerberosKeytab");
+            }
+
+            if (kerberosEnabled) {
+                try {
+                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
+                } catch(HiveUtils.AuthenticationFailed ex) {
+                    LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
+                    throw new IllegalArgumentException(ex);
+                }
+            }
+
+            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            String timeoutName = "hive-bolt-%d";
+            this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                                                                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            heartBeatTimer= new Timer();
+            setupHeartBeatTimer();
+        } catch(Exception e) {
+            LOG.warn("unable to make connection to hive ",e);
+        }
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        try {
+            writeTuples(tuples);
+        } catch (Exception e) {
+            abortAndCloseWriters();
+            LOG.warn("hive streaming failed.",e);
+            throw new FailedException(e);
+        }
+    }
+
+    private void writeTuples(List<TridentTuple> tuples)
+        throws Exception {
+        if(timeToSendHeartBeat.compareAndSet(true, false)) {
+            enableHeartBeatOnAllWriters();
+        }
+        for (TridentTuple tuple : tuples) {
+            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+            HiveWriter writer = getOrCreateWriter(endPoint);
+            writer.write(options.getMapper().mapRecord(tuple));
+            currentBatchSize++;
+            if(currentBatchSize >= options.getBatchSize()) {
+                flushAllWriters();
+                currentBatchSize = 0;
+            }
+        }
+    }
+
+    private void abortAndCloseWriters() {
+        try {
+            abortAllWriters();
+            closeAllWriters();
+        } catch(InterruptedException e) {
+            LOG.warn("unable to close hive connections. ", e);
+        } catch(IOException ie) {
+            LOG.warn("unable to close hive connections. ", ie);
+        }
+    }
+
+    /**
+     * Abort current Txn on all writers
+     * @return number of writers retired
+     */
+    private void abortAllWriters() throws InterruptedException {
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            entry.getValue().abort();
+        }
+    }
+
+
+    /**
+     * Closes all writers and remove them from cache
+     * @return number of writers retired
+     */
+    private void closeAllWriters() throws InterruptedException, IOException {
+        //1) Retire writers
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            entry.getValue().close();
+        }
+        //2) Clear cache
+        allWriters.clear();
+    }
+
+    private void setupHeartBeatTimer() {
+        if(options.getHeartBeatInterval()>0) {
+            heartBeatTimer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        timeToSendHeartBeat.set(true);
+                        setupHeartBeatTimer();
+                    }
+                }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void flushAllWriters()
+        throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for(HiveWriter writer: allWriters.values()) {
+            writer.flush(true);
+        }
+    }
+
+    private void enableHeartBeatOnAllWriters() {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.setHeartBeatNeeded();
+        }
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        try {
+            HiveWriter writer = allWriters.get( endPoint );
+            if( writer == null ) {
+                LOG.info("Creating Writer to Hive end point : " + endPoint);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if(allWriters.size() > options.getMaxOpenConnections()){
+                    int retired = retireIdleWriters();
+                    if(retired==0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+
+    }
+
+
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            LOG.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
+
+        //1) Find retirement candidates
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for(HiveEndPoint ep : retirees) {
+            try {
+                LOG.info("Closing idle Writer to Hive end point : {}", ep);
+                allWriters.remove(ep).close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        return count;
+    }
+
+    public void cleanup() {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                LOG.info("Flushing writer to {}", w);
+                w.flush(false);
+                LOG.info("Closing writer to {}", w);
+                w.close();
+            } catch (Exception ex) {
+                LOG.warn("Error while closing writer to " + entry.getKey() +
+                         ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        ExecutorService toShutdown[] = {callTimeoutPool};
+        for (ExecutorService execService : toShutdown) {
+            execService.shutdown();
+            try {
+                while (!execService.isTerminated()) {
+                    execService.awaitTermination(
+                                                 options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException ex) {
+                LOG.warn("shutdown interrupted on " + execService, ex);
+            }
+        }
+        callTimeoutPool = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
new file mode 100644
index 0000000..8f3b9e9
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -0,0 +1,31 @@
+package org.apache.storm.hive.trident;
+
+import backtype.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+
+
+public class HiveStateFactory implements StateFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveStateFactory.class);
+    private HiveOptions options;
+
+    public HiveStateFactory(){}
+
+    public HiveStateFactory withOptions(HiveOptions options){
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+        HiveState state = new HiveState(this.options);
+        state.prepare(conf, metrics, partitionIndex, numPartitions);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
new file mode 100644
index 0000000..b0b32f1
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
@@ -0,0 +1,14 @@
+package org.apache.storm.hive.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class HiveUpdater extends BaseStateUpdater<HiveState>{
+    @Override
+    public void updateState(HiveState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
new file mode 100644
index 0000000..d492819
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.thrift.TException;
+
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveSetupUtil {
+    public static class RawFileSystem extends RawLocalFileSystem {
+        private static final URI NAME;
+        static {
+            try {
+                NAME = new URI("raw:///");
+            } catch (URISyntaxException se) {
+                throw new IllegalArgumentException("bad uri", se);
+            }
+        }
+
+        @Override
+        public URI getUri() {
+            return NAME;
+        }
+
+        @Override
+        public FileStatus getFileStatus(Path path) throws IOException {
+            File file = pathToFile(path);
+            if (!file.exists()) {
+                throw new FileNotFoundException("Can't find " + path);
+            }
+            // get close enough
+            short mod = 0;
+            if (file.canRead()) {
+                mod |= 0444;
+            }
+            if (file.canWrite()) {
+                mod |= 0200;
+            }
+            if (file.canExecute()) {
+                mod |= 0111;
+            }
+            ShimLoader.getHadoopShims();
+            return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+                                  file.lastModified(), file.lastModified(),
+                                  FsPermission.createImmutable(mod), "owen", "users", path);
+        }
+    }
+
+    private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+    public static HiveConf getHiveConf() {
+        HiveConf conf = new HiveConf();
+        // String metastoreDBLocation = "jdbc:derby:databaseName=/tmp/metastore_db;create=true";
+        // conf.set("javax.jdo.option.ConnectionDriverName","org.apache.derby.jdbc.EmbeddedDriver");
+        // conf.set("javax.jdo.option.ConnectionURL",metastoreDBLocation);
+        conf.set("fs.raw.impl", RawFileSystem.class.getName());
+        conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
+        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+        return conf;
+    }
+
+    public static void createDbAndTable(HiveConf conf, String databaseName,
+                                        String tableName, List<String> partVals,
+                                        String[] colNames, String[] colTypes,
+                                        String[] partNames, String dbLocation)
+        throws Exception {
+        IMetaStoreClient client = new HiveMetaStoreClient(conf);
+        try {
+            Database db = new Database();
+            db.setName(databaseName);
+            db.setLocationUri(dbLocation);
+            client.createDatabase(db);
+
+            Table tbl = new Table();
+            tbl.setDbName(databaseName);
+            tbl.setTableName(tableName);
+            tbl.setTableType(TableType.MANAGED_TABLE.toString());
+            StorageDescriptor sd = new StorageDescriptor();
+            sd.setCols(getTableColumns(colNames, colTypes));
+            sd.setNumBuckets(1);
+            sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
+            if(partNames!=null && partNames.length!=0) {
+                tbl.setPartitionKeys(getPartitionKeys(partNames));
+            }
+
+            tbl.setSd(sd);
+
+            sd.setBucketCols(new ArrayList<String>(2));
+            sd.setSerdeInfo(new SerDeInfo());
+            sd.getSerdeInfo().setName(tbl.getTableName());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+            sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
+            sd.setInputFormat(OrcInputFormat.class.getName());
+            sd.setOutputFormat(OrcOutputFormat.class.getName());
+
+            Map<String, String> tableParams = new HashMap<String, String>();
+            tbl.setParameters(tableParams);
+            client.createTable(tbl);
+            try {
+                if(partVals!=null && partVals.size() > 0) {
+                    addPartition(client, tbl, partVals);
+                }
+            } catch(AlreadyExistsException e) {
+            }
+        } finally {
+            client.close();
+        }
+    }
+
+    // delete db and all tables in it
+    public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException {
+        IMetaStoreClient client = new HiveMetaStoreClient(conf);
+        try {
+            for (String table : client.listTableNamesByFilter(databaseName, "", (short) -1)) {
+                client.dropTable(databaseName, table, true, true);
+            }
+            client.dropDatabase(databaseName);
+        } catch (TException e) {
+            client.close();
+        }
+    }
+
+    private static void addPartition(IMetaStoreClient client, Table tbl
+                                     , List<String> partValues)
+        throws IOException, TException {
+        Partition part = new Partition();
+        part.setDbName(tbl.getDbName());
+        part.setTableName(tbl.getTableName());
+        StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
+        sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys(), partValues));
+        part.setSd(sd);
+        part.setValues(partValues);
+        client.add_partition(part);
+    }
+
+    private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
+        if(partKeys.size()!=partVals.size()) {
+            throw new IllegalArgumentException("Partition values:" + partVals +
+                                               ", does not match the partition Keys in table :" + partKeys );
+        }
+        StringBuffer buff = new StringBuffer(partKeys.size()*20);
+        int i=0;
+        for(FieldSchema schema : partKeys) {
+            buff.append(schema.getName());
+            buff.append("=");
+            buff.append(partVals.get(i));
+            if(i!=partKeys.size()-1) {
+                buff.append(Path.SEPARATOR);
+            }
+            ++i;
+        }
+        return buff.toString();
+    }
+
+    private static List<FieldSchema> getTableColumns(String[] colNames, String[] colTypes) {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for (int i=0; i<colNames.length; ++i) {
+            fields.add(new FieldSchema(colNames[i], colTypes[i], ""));
+        }
+        return fields;
+    }
+
+    private static List<FieldSchema> getPartitionKeys(String[] partNames) {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for (int i=0; i < partNames.length; ++i) {
+           fields.add(new FieldSchema(partNames[i], serdeConstants.STRING_TYPE_NAME, ""));
+        }
+        return fields;
+    }
+
+}


[07/13] storm git commit: Merge branch 'STORM-539-V2' of https://github.com/harshach/incubator-storm into STORM-539-V2

Posted by sr...@apache.org.
Merge branch 'STORM-539-V2' of https://github.com/harshach/incubator-storm into STORM-539-V2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/674ceae7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/674ceae7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/674ceae7

Branch: refs/heads/master
Commit: 674ceae7bdfba512cad0053dc5fd34026cc5b57f
Parents: e85b79a dfb8e37
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 12 09:46:50 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 12 09:46:50 2015 -0800

----------------------------------------------------------------------

----------------------------------------------------------------------



[08/13] storm git commit: STORM-539. Storm hive bolt and trident state. Added reportError for HiveBolt.

Posted by sr...@apache.org.
STORM-539. Storm hive bolt and trident state. Added reportError for HiveBolt.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0cbb493
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0cbb493
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0cbb493

Branch: refs/heads/master
Commit: b0cbb493e541b4b3762b1adfed695abbb39ec863
Parents: 674ceae
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 12 09:47:04 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 12 09:47:04 2015 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/hive/bolt/HiveBolt.java         | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b0cbb493/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index 849697d..4d9f5da 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -111,9 +111,9 @@ public class HiveBolt extends  BaseRichBolt {
             }
             collector.ack(tuple);
         } catch(Exception e) {
+            this.collector.reportError(e);
             collector.fail(tuple);
             flushAndCloseWriters();
-            LOG.warn("hive streaming failed. ",e);
         }
     }
 


[11/13] storm git commit: Added storm-hive to modules.

Posted by sr...@apache.org.
Added storm-hive to modules.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e62c1633
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e62c1633
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e62c1633

Branch: refs/heads/master
Commit: e62c163335ad6d75d6a6de7d09f3c7d7ff5cbb77
Parents: 58a3479
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Feb 23 14:58:34 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Feb 23 14:58:34 2015 -0800

----------------------------------------------------------------------
 pom.xml                                        |  1 +
 storm-dist/binary/src/main/assembly/binary.xml | 14 ++++++++++++++
 2 files changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e62c1633/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1f7ff79..73f8f62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hbase</module>
+        <module>external/storm-hive</module>
         <module>external/storm-jdbc</module>
         <module>external/storm-redis</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/storm/blob/e62c1633/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index dffffec..246f350 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -117,6 +117,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive/target</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
     </fileSets>
 
     <files>


[02/13] storm git commit: STORM-539. Storm hive bolt and trident state.

Posted by sr...@apache.org.
STORM-539. Storm hive bolt and trident state.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01ab7b14
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01ab7b14
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01ab7b14

Branch: refs/heads/master
Commit: 01ab7b141c408809d6491556494bded0d436ba4d
Parents: 3bbdc16
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Dec 15 14:24:51 2014 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Dec 15 14:24:51 2014 -0800

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 external/storm-hive/README.md                   | 111 +++++
 external/storm-hive/pom.xml                     | 143 +++++++
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 289 +++++++++++++
 .../bolt/mapper/DelimitedRecordHiveMapper.java  | 143 +++++++
 .../storm/hive/bolt/mapper/HiveMapper.java      |  81 ++++
 .../hive/bolt/mapper/JsonRecordHiveMapper.java  | 132 ++++++
 .../apache/storm/hive/common/HiveOptions.java   | 146 +++++++
 .../org/apache/storm/hive/common/HiveUtils.java |  76 ++++
 .../apache/storm/hive/common/HiveWriter.java    | 420 +++++++++++++++++++
 .../apache/storm/hive/trident/HiveState.java    | 306 ++++++++++++++
 .../storm/hive/trident/HiveStateFactory.java    |  31 ++
 .../apache/storm/hive/trident/HiveUpdater.java  |  14 +
 .../apache/storm/hive/bolt/HiveSetupUtil.java   | 220 ++++++++++
 .../apache/storm/hive/bolt/HiveTopology.java    | 150 +++++++
 .../hive/bolt/HiveTopologyPartitioned.java      | 153 +++++++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 330 +++++++++++++++
 .../storm/hive/common/TestHiveWriter.java       | 193 +++++++++
 .../storm/hive/trident/TridentHiveTopology.java | 190 +++++++++
 pom.xml                                         |   4 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 21 files changed, 3148 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5626872..df109b3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,3 +31,6 @@ target
 .*
 !/.gitignore
 _site
+dependency-reduced-pom.xml
+derby.log
+metastore_db

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
new file mode 100644
index 0000000..6461462
--- /dev/null
+++ b/external/storm-hive/README.md
@@ -0,0 +1,111 @@
+# Storm Hive Bolt & Trident State
+
+  Hive offers streaming API that allows data to be written continuously into Hive. The incoming data 
+  can be continuously committed in small batches of records into existing Hive partition or table. Once the data
+  is committed its immediately visible to all hive queries. More info on Hive Streaming API 
+  https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
+  
+  With the help of Hive Streaming API , HiveBolt and HiveState allows users to stream data from storm into hive directly.
+  To use Hive streaming API users need to create a bucketed table with ORC format.  Example below
+  
+  ```code
+  create table test_table ( id INT, name STRING, phone STRING, street STRING) partitioned by (city STRING, state STRING) stored as orc tblproperties ("orc.compress"="NONE");
+  ```
+  
+
+## HiveBolt
+
+HiveBolt streams tuples directly into hive. Tuples are written using Hive Transactions. 
+Partiions to which HiveBolt will stream to can either created or pre-created or optionally
+HiveBolt  can create them if they are missing. Fields from Tuples are mapped to table columns.
+User should make sure that Tuple filed names are matched to the table column names.
+
+```java
+DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper);
+HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+```
+
+### RecordHiveMapper
+   This class maps Tuple filed names to Hive table column names.
+   There are two implementaitons available
+ 
+   
+   1) DelimitedRecordHiveMapper
+   2) JsonRecordHiveMapper
+   
+   ```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+    or
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+   ```
+
+|Arg | Description | Type
+|--- |--- |---
+|withColumnFields| field names in a tuple to be mapped to table column names | Fileds (required) |
+|withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
+|withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
+
+### HiveOptions
+  
+HiveBolt takes in HiveOptions as a constructor arg.
+
+  ```java
+  HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+  ```
+
+
+HiveOptions params
+
+|Arg  |Description | Type
+|---	|--- |---
+|metaStoreURI | hive meta store URI (can be found in hive-site.xml) | String (required) |
+|dbName | database name | String (required) |
+|tblName | table name | String (required) |
+|mapper| Mapper class to map Tuple field names to Table column names | DelimitedRecordHiveMapper or JsonRecordHiveMapper (required) |
+|withTxnsPerBatch | Hive grants a *batch of transactions* instead of single transactions to streaming clients like HiveBolt.This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.| Integer . default 100 |
+|withMaxOpenConnections| Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.| Integer . default 100|
+|withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
+|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
+|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. defalut true |
+|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
+|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+
+
+ 
+## HiveState
+
+Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
+
+```code
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+            
+   HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+                	     		
+   StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+   TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+ ```
+   
+ 
+ 
+
+
+
+
+
+
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
new file mode 100644
index 0000000..adf7567
--- /dev/null
+++ b/external/storm-hive/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>storm</artifactId>
+    <groupId>org.apache.storm</groupId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <packaging>jar</packaging>
+  <artifactId>storm-hive</artifactId>
+  <name>storm-hive</name>
+  <developers>
+    <developer>
+      <id>harshach</id>
+      <name>Sriharsha Chintalapani</name>
+      <email>mail@harsha.io</email>
+    </developer>
+  </developers>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-streaming</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+	    <groupId>org.apache.calcite</groupId>
+	    <artifactId>calcite-core</artifactId>
+      <version>0.9.2-incubating</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.9.0</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
new file mode 100644
index 0000000..849697d
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.hive.common.HiveWriter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+public class HiveBolt extends  BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+    private OutputCollector collector;
+    private HiveOptions options;
+    private Integer currentBatchSize;
+    private ExecutorService callTimeoutPool;
+    private transient Timer heartBeatTimer;
+    private Boolean kerberosEnabled = false;
+    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private UserGroupInformation ugi = null;
+    HashMap<HiveEndPoint, HiveWriter> allWriters;
+
+    public HiveBolt(HiveOptions options) {
+        this.options = options;
+        this.currentBatchSize = 0;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector)  {
+        try {
+            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) {
+                kerberosEnabled = false;
+            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) {
+                kerberosEnabled = true;
+            } else {
+                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " +
+                                                   " & KerberosKeytab");
+            }
+
+            if (kerberosEnabled) {
+                try {
+                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
+                } catch(HiveUtils.AuthenticationFailed ex) {
+                    LOG.error("Hive Kerberos authentication failed " + ex.getMessage(), ex);
+                    throw new IllegalArgumentException(ex);
+                }
+            }
+            this.collector = collector;
+            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            String timeoutName = "hive-bolt-%d";
+            this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            heartBeatTimer = new Timer();
+            setupHeartBeatTimer();
+        } catch(Exception e) {
+            LOG.warn("unable to make connection to hive ",e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+            HiveWriter writer = getOrCreateWriter(endPoint);
+            if(timeToSendHeartBeat.compareAndSet(true, false)) {
+                enableHeartBeatOnAllWriters();
+            }
+            writer.write(options.getMapper().mapRecord(tuple));
+            currentBatchSize++;
+            if(currentBatchSize >= options.getBatchSize()) {
+                flushAllWriters();
+                currentBatchSize = 0;
+            }
+            collector.ack(tuple);
+        } catch(Exception e) {
+            collector.fail(tuple);
+            flushAndCloseWriters();
+            LOG.warn("hive streaming failed. ",e);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                LOG.info("Flushing writer to {}", w);
+                w.flush(false);
+                LOG.info("Closing writer to {}", w);
+                w.close();
+            } catch (Exception ex) {
+                LOG.warn("Error while closing writer to " + entry.getKey() +
+                         ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        ExecutorService toShutdown[] = {callTimeoutPool};
+        for (ExecutorService execService : toShutdown) {
+            execService.shutdown();
+            try {
+                while (!execService.isTerminated()) {
+                    execService.awaitTermination(
+                                 options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException ex) {
+                LOG.warn("shutdown interrupted on " + execService, ex);
+            }
+        }
+        callTimeoutPool = null;
+        super.cleanup();
+        LOG.info("Hive Bolt stopped");
+    }
+
+
+    private void setupHeartBeatTimer() {
+        if(options.getHeartBeatInterval()>0) {
+            heartBeatTimer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        timeToSendHeartBeat.set(true);
+                        setupHeartBeatTimer();
+                    }
+                }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void flushAllWriters()
+        throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for(HiveWriter writer: allWriters.values()) {
+            writer.flush(true);
+        }
+    }
+
+    /**
+     * Closes all writers and remove them from cache
+     * @return number of writers retired
+     */
+    private void closeAllWriters() {
+        try {
+            //1) Retire writers
+            for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+                entry.getValue().close();
+            }
+            //2) Clear cache
+            allWriters.clear();
+        } catch(Exception e) {
+            LOG.warn("unable to close writers. ", e);
+        }
+    }
+
+    private void flushAndCloseWriters() {
+        try {
+            flushAllWriters();
+        } catch(Exception e) {
+            LOG.warn("unable to flush hive writers. ", e);
+        } finally {
+            closeAllWriters();
+        }
+    }
+
+    private void enableHeartBeatOnAllWriters() {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.setHeartBeatNeeded();
+        }
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        try {
+            HiveWriter writer = allWriters.get( endPoint );
+            if( writer == null ) {
+                LOG.debug("Creating Writer to Hive end point : " + endPoint);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if(allWriters.size() > options.getMaxOpenConnections()){
+                    int retired = retireIdleWriters();
+                    if(retired==0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+    }
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            LOG.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
+
+        //1) Find retirement candidates
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for(HiveEndPoint ep : retirees) {
+            try {
+                LOG.info("Closing idle Writer to Hive end point : {}", ep);
+                allWriters.remove(ep).close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
new file mode 100644
index 0000000..d516795
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.IOException;
+
+public class DelimitedRecordHiveMapper implements HiveMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private Fields columnFields;
+    private Fields partitionFields;
+    private String[] columnNames;
+    private String timeFormat;
+    private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+    private SimpleDateFormat parseDate;
+
+    public DelimitedRecordHiveMapper() {
+    }
+
+    public DelimitedRecordHiveMapper withColumnFields(Fields columnFields) {
+        this.columnFields = columnFields;
+        List<String> tempColumnNamesList = this.columnFields.toList();
+        columnNames = new String[tempColumnNamesList.size()];
+        tempColumnNamesList.toArray(columnNames);
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withPartitionFields(Fields partitionFields) {
+        this.partitionFields = partitionFields;
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter){
+        this.fieldDelimiter = delimiter;
+        return this;
+    }
+
+    public DelimitedRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
+        this.timeFormat = timeFormat;
+        parseDate = new SimpleDateFormat(timeFormat);
+        return this;
+    }
+
+    @Override
+    public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException {
+        return new DelimitedInputWriter(columnNames, fieldDelimiter,endPoint);
+    }
+
+    @Override
+    public void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException {
+        txnBatch.write(mapRecord(tuple));
+    }
+
+    @Override
+    public List<String> mapPartitions(Tuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(Tuple tuple) {
+        StringBuilder builder = new StringBuilder();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                builder.append(tuple.getValueByField(field));
+                builder.append(fieldDelimiter);
+            }
+        }
+        return builder.toString().getBytes();
+    }
+
+    @Override
+    public List<String> mapPartitions(TridentTuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(TridentTuple tuple) {
+        StringBuilder builder = new StringBuilder();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                builder.append(tuple.getValueByField(field));
+                builder.append(fieldDelimiter);
+            }
+        }
+        return builder.toString().getBytes();
+    }
+
+    private String getPartitionsByTimeFormat() {
+        Date d = new Date();
+        return parseDate.format(d.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
new file mode 100644
index 0000000..a3b5531
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+import java.util.List;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import java.io.Serializable;
+
+import java.io.IOException;
+
+/**
+ * Maps a <code>backtype.storm.tuple.Tupe</code> object
+ * to a row in an Hive table.
+ */
+public interface HiveMapper extends Serializable {
+
+    /**
+     * Given a endPoint, returns a RecordWriter with columnNames.
+     *
+     * @param tuple
+     * @return
+     */
+
+    RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException;
+
+    void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException;
+
+    /**
+     * Given a tuple, return a hive partition values list.
+     *
+     * @param tuple
+     * @return List<String>
+     */
+    List<String> mapPartitions(Tuple tuple);
+
+    /**
+     * Given a tuple, maps to a HiveRecord based on columnFields
+     * @Param Tuple
+     * @return byte[]
+     */
+    byte[] mapRecord(Tuple tuple);
+
+    /**
+     * Given a TridetnTuple, return a hive partition values list.
+     *
+     * @param TridentTuple
+     * @return List<String>
+     */
+    List<String> mapPartitions(TridentTuple tuple);
+
+    /**
+     * Given a TridentTuple, maps to a HiveRecord based on columnFields
+     * @Param TridentTuple
+     * @return byte[]
+     */
+    byte[] mapRecord(TridentTuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
new file mode 100644
index 0000000..ce3e475
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.bolt.mapper;
+
+
+import backtype.storm.tuple.Fields;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import java.io.IOException;
+
+public class JsonRecordHiveMapper implements HiveMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
+    private Fields columnFields;
+    private Fields partitionFields;
+    private String timeFormat;
+    private SimpleDateFormat parseDate;
+
+    public JsonRecordHiveMapper() {
+    }
+
+    public JsonRecordHiveMapper withColumnFields(Fields columnFields) {
+        this.columnFields = columnFields;
+        return this;
+    }
+
+    public JsonRecordHiveMapper withPartitionFields(Fields partitionFields) {
+        this.partitionFields = partitionFields;
+        return this;
+    }
+
+    public JsonRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
+        this.timeFormat = timeFormat;
+        parseDate = new SimpleDateFormat(timeFormat);
+        return this;
+    }
+
+    @Override
+    public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+        throws StreamingException, IOException, ClassNotFoundException {
+        return new StrictJsonWriter(endPoint);
+    }
+
+    @Override
+    public void write(TransactionBatch txnBatch, Tuple tuple)
+        throws StreamingException, IOException, InterruptedException {
+        txnBatch.write(mapRecord(tuple));
+    }
+
+    @Override
+    public List<String> mapPartitions(Tuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(Tuple tuple) {
+        JSONObject obj = new JSONObject();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                obj.put(field,tuple.getValueByField(field));
+            }
+        }
+        return obj.toJSONString().getBytes();
+    }
+
+    @Override
+    public List<String> mapPartitions(TridentTuple tuple) {
+        List<String> partitionList = new ArrayList<String>();
+        if(this.partitionFields != null) {
+            for(String field: this.partitionFields) {
+                partitionList.add(tuple.getStringByField(field));
+            }
+        }
+        if (this.timeFormat != null) {
+            partitionList.add(getPartitionsByTimeFormat());
+        }
+        return partitionList;
+    }
+
+    @Override
+    public byte[] mapRecord(TridentTuple tuple) {
+        JSONObject obj = new JSONObject();
+        if(this.columnFields != null) {
+            for(String field: this.columnFields) {
+                obj.put(field,tuple.getValueByField(field));
+            }
+        }
+        return obj.toJSONString().getBytes();
+    }
+
+    private String getPartitionsByTimeFormat() {
+        Date d = new Date();
+        return parseDate.format(d.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
new file mode 100644
index 0000000..d316294
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import java.io.Serializable;
+
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.hive.hcatalog.streaming.*;
+
+
+public class HiveOptions implements Serializable {
+    protected HiveMapper mapper;
+    protected String databaseName;
+    protected String tableName;
+    protected String metaStoreURI;
+    protected Integer txnsPerBatch = 100;
+    protected Integer maxOpenConnections = 500;
+    protected Integer batchSize = 15000;
+    protected Integer idleTimeout = 0;
+    protected Integer callTimeout = 10000;
+    protected Integer heartBeatInterval = 240;
+    protected Boolean autoCreatePartitions = true;
+    protected String kerberosPrincipal;
+    protected String kerberosKeytab;
+
+    public HiveOptions(String metaStoreURI,String databaseName,String tableName,HiveMapper mapper) {
+        this.metaStoreURI = metaStoreURI;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.mapper = mapper;
+    }
+
+    public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
+        this.txnsPerBatch = txnsPerBatch;
+        return this;
+    }
+
+    public HiveOptions withMaxOpenConnections(Integer maxOpenConnections) {
+        this.maxOpenConnections = maxOpenConnections;
+        return this;
+    }
+
+    public HiveOptions withBatchSize(Integer batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public HiveOptions withIdleTimeout(Integer idleTimeout) {
+        this.idleTimeout = idleTimeout;
+        return this;
+    }
+
+    public HiveOptions withCallTimeout(Integer callTimeout) {
+        this.callTimeout = callTimeout;
+        return this;
+    }
+
+    public HiveOptions withHeartBeatInterval(Integer heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+        return this;
+    }
+
+    public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
+        this.autoCreatePartitions = autoCreatePartitions;
+        return this;
+    }
+
+    public HiveOptions withKerberosKeytab(String kerberosKeytab) {
+        this.kerberosKeytab = kerberosKeytab;
+        return this;
+    }
+
+    public HiveOptions withKerberosPrincipal(String kerberosPrincipal) {
+        this.kerberosPrincipal = kerberosPrincipal;
+        return this;
+    }
+
+    public String getMetaStoreURI() {
+        return metaStoreURI;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public HiveMapper getMapper() {
+        return mapper;
+    }
+
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public Integer getCallTimeOut() {
+        return callTimeout;
+    }
+
+    public Integer getHeartBeatInterval() {
+        return heartBeatInterval;
+    }
+
+    public Integer getMaxOpenConnections() {
+        return maxOpenConnections;
+    }
+
+    public Integer getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public Integer getTxnsPerBatch() {
+        return txnsPerBatch;
+    }
+
+    public Boolean getAutoCreatePartitions() {
+        return autoCreatePartitions;
+    }
+
+    public String getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public String getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
new file mode 100644
index 0000000..5483b07
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.hive.hcatalog.streaming.*;
+
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.io.File;
+import java.io.IOException;
+
+public class HiveUtils {
+
+    public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
+        if(partitionVals==null) {
+            return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
+        }
+        return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
+    }
+
+    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
+                              options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi);
+    }
+
+    public static synchronized UserGroupInformation authenticate(String keytab, String principal)
+    throws AuthenticationFailed {
+        File kfile = new File(keytab);
+        if (!(kfile.isFile() && kfile.canRead())) {
+            throw new IllegalArgumentException("The keyTab file: "
+                                               + keytab + " is nonexistent or can't read. "
+                                               + "Please specify a readable keytab file for Kerberos auth.");
+        }
+        try {
+            principal = SecurityUtil.getServerPrincipal(principal, "");
+        } catch (Exception e) {
+            throw new AuthenticationFailed("Host lookup error when resolving principal " + principal, e);
+        }
+        try {
+            UserGroupInformation.loginUserFromKeytab(principal, keytab);
+            return UserGroupInformation.getLoginUser();
+        } catch (IOException e) {
+            throw new AuthenticationFailed("Login failed for principal " + principal, e);
+        }
+    }
+
+     public static class AuthenticationFailed extends Exception {
+         public AuthenticationFailed(String reason, Exception cause) {
+             super("Kerberos Authentication Failed. " + reason, cause);
+         }
+     }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
new file mode 100644
index 0000000..726b8e8
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import backtype.storm.tuple.Tuple;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HiveWriter {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(HiveWriter.class);
+
+    private final HiveEndPoint endPoint;
+    private final StreamingConnection connection;
+    private final int txnsPerBatch;
+    private final RecordWriter recordWriter;
+    private TransactionBatch txnBatch;
+    private final ExecutorService callTimeoutPool;
+    private final long callTimeout;
+
+    private long lastUsed; // time of last flush on this writer
+    protected boolean closed; // flag indicating HiveWriter was closed
+    private boolean autoCreatePartitions;
+    private boolean heartBeatNeeded = false;
+    private UserGroupInformation ugi;
+
+    public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
+                      boolean autoCreatePartitions, long callTimeout,
+                      ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi)
+        throws InterruptedException, ConnectFailure {
+        try {
+            this.autoCreatePartitions = autoCreatePartitions;
+            this.callTimeout = callTimeout;
+            this.callTimeoutPool = callTimeoutPool;
+            this.endPoint = endPoint;
+            this.ugi = ugi;
+            this.connection = newConnection(ugi);
+            this.txnsPerBatch = txnsPerBatch;
+            this.recordWriter = mapper.createRecordWriter(endPoint);
+            this.txnBatch = nextTxnBatch(recordWriter);
+            this.closed = false;
+            this.lastUsed = System.currentTimeMillis();
+        } catch(InterruptedException e) {
+            throw e;
+        } catch(RuntimeException e) {
+            throw e;
+        } catch(Exception e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return endPoint.toString();
+    }
+
+    public void setHeartBeatNeeded() {
+        heartBeatNeeded = true;
+    }
+
+    /**
+     * Write data <br />
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public synchronized void write(final byte[] record)
+        throws WriteFailure, InterruptedException {
+        if (closed) {
+            throw new IllegalStateException("This hive streaming writer was closed " +
+                                            "and thus no longer able to write : " + endPoint);
+        }
+        // write the tuple
+        try {
+            LOG.debug("Writing event to {}", endPoint);
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws StreamingException, InterruptedException {
+                        txnBatch.write(record);
+                        return null;
+                    }
+                });
+        } catch(StreamingException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        } catch(TimeoutException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    /**
+     * Commits the current Txn.
+     * If 'rollToNext' is true, will switch to next Txn in batch or to a
+     *       new TxnBatch if current Txn batch is exhausted
+     * TODO: see what to do when there are errors in each IO call stage
+     */
+    public void flush(boolean rollToNext)
+        throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
+        if(heartBeatNeeded) {
+            heartBeatNeeded = false;
+            heartBeat();
+        }
+        lastUsed = System.currentTimeMillis();
+        try {
+            commitTxn();
+            if(txnBatch.remainingTransactions() == 0) {
+                closeTxnBatch();
+                txnBatch = null;
+                if(rollToNext) {
+                    txnBatch = nextTxnBatch(recordWriter);
+                }
+            }
+            if(rollToNext) {
+                LOG.debug("Switching to next Txn for {}", endPoint);
+                txnBatch.beginNextTransaction(); // does not block
+            }
+        } catch(StreamingException e) {
+            throw new TxnFailure(txnBatch, e);
+        }
+    }
+
+    /** Queues up a heartbeat request on the current and remaining txns using the
+     *  heartbeatThdPool and returns immediately
+     */
+    public void heartBeat() throws InterruptedException {
+        // 1) schedule the heartbeat on one thread in pool
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws Exception {
+                        try {
+                            LOG.debug("Sending heartbeat on batch " + txnBatch);
+                            txnBatch.heartbeat();
+                        } catch (StreamingException e) {
+                            LOG.warn("Heartbeat error on batch " + txnBatch, e);
+                        }
+                        return null;
+                    }
+                });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+            // Suppressing exceptions as we don't care for errors on heartbeats
+        }
+    }
+
+    /**
+     * Close the Transaction Batch and connection
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void close() throws IOException, InterruptedException {
+        closeTxnBatch();
+        closeConnection();
+        closed = true;
+    }
+
+    private void closeConnection() throws InterruptedException {
+        LOG.info("Closing connection to end point : {}", endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        connection.close(); // could block
+                        return null;
+                    }
+                });
+        } catch(Exception e) {
+            LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on connection close
+        }
+    }
+
+    private void commitTxn() throws CommitFailure, InterruptedException {
+        LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId() , endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        txnBatch.commit(); // could block
+                        return null;
+                    }
+                });
+        } catch (StreamingException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        } catch (TimeoutException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    private StreamingConnection newConnection(final UserGroupInformation ugi)
+        throws InterruptedException, ConnectFailure {
+        try {
+            return  callWithTimeout(new CallRunner<StreamingConnection>() {
+                    @Override
+                    public StreamingConnection call() throws Exception {
+                        return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
+                    }
+                });
+        } catch(StreamingException e) {
+            throw new ConnectFailure(endPoint, e);
+        } catch(TimeoutException e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
+        throws InterruptedException, TxnBatchFailure {
+        LOG.debug("Fetching new Txn Batch for {}", endPoint);
+        TransactionBatch batch = null;
+        try {
+            batch = callWithTimeout(new CallRunner<TransactionBatch>() {
+                @Override
+                public TransactionBatch call() throws Exception {
+                    return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
+                }
+            });
+        LOG.debug("Acquired {}. Switching to first txn", batch);
+        batch.beginNextTransaction();
+        } catch(TimeoutException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        } catch(StreamingException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        }
+        return batch;
+    }
+
+    private void closeTxnBatch() throws  InterruptedException {
+        try {
+            LOG.debug("Closing Txn Batch {}", txnBatch);
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws Exception {
+                        if(txnBatch != null) {
+                            txnBatch.close(); // could block
+                        }
+                        return null;
+                    }
+                });
+        } catch(InterruptedException e) {
+            throw e;
+        } catch(Exception e) {
+            LOG.warn("Error closing txn batch "+ txnBatch, e);
+        }
+    }
+
+    /**
+     * Aborts the current Txn and switches to next Txn.
+     * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+     */
+    public void abort() throws InterruptedException {
+        abortTxn();
+    }
+
+    private void abortTxn() throws InterruptedException {
+        LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                        public Void call() throws StreamingException, InterruptedException {
+                        txnBatch.abort(); // could block
+                        return null;
+                    }
+                });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (TimeoutException e) {
+            LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+        } catch (Exception e) {
+            LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on abort
+        }
+    }
+
+
+    /**
+     * If the current thread has been interrupted, then throws an
+     * exception.
+     * @throws InterruptedException
+     */
+    private static void checkAndThrowInterruptedException()
+        throws InterruptedException {
+        if (Thread.currentThread().interrupted()) {
+            throw new InterruptedException("Timed out before Hive call was made. "
+                                           + "Your callTimeout might be set too low or Hive calls are "
+                                           + "taking too long.");
+        }
+    }
+
+    /**
+     * Execute the callable on a separate thread and wait for the completion
+     * for the specified amount of time in milliseconds. In case of timeout
+     * cancel the callable and throw an IOException
+     */
+    private <T> T callWithTimeout(final CallRunner<T> callRunner)
+        throws TimeoutException, StreamingException, InterruptedException {
+        Future<T> future = callTimeoutPool.submit(new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    return callRunner.call();
+                }
+            });
+        try {
+            if (callTimeout > 0) {
+                return future.get(callTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                return future.get();
+            }
+        } catch (TimeoutException eT) {
+            future.cancel(true);
+            throw eT;
+        } catch (ExecutionException e1) {
+            Throwable cause = e1.getCause();
+            if (cause instanceof IOException) {
+                throw new StreamingIOFailure("I/O Failure", (IOException) cause);
+            } else if (cause instanceof StreamingException) {
+                throw (StreamingException) cause;
+            } else if (cause instanceof InterruptedException) {
+                throw (InterruptedException) cause;
+            } else if (cause instanceof RuntimeException) {
+                throw (RuntimeException) cause;
+            } else if (cause instanceof TimeoutException) {
+                throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
+            } else {
+                throw new RuntimeException(e1);
+            }
+        }
+    }
+
+    public long getLastUsed() {
+        return lastUsed;
+    }
+
+    private byte[] generateRecord(Tuple tuple) {
+        StringBuilder buf = new StringBuilder();
+        for (Object o: tuple.getValues()) {
+            buf.append(o);
+            buf.append(",");
+        }
+        return buf.toString().getBytes();
+    }
+
+    /**
+     * Simple interface whose <tt>call</tt> method is called by
+     * {#callWithTimeout} in a new thread inside a
+     * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+     * @param <T>
+     */
+    private interface CallRunner<T> {
+        T call() throws Exception;
+    }
+
+    public static class Failure extends Exception {
+        public Failure(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    public static class WriteFailure extends Failure {
+        public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
+            super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
+        }
+    }
+
+    public static class CommitFailure extends Failure {
+        public CommitFailure(HiveEndPoint endPoint, Long txnID, Throwable cause) {
+            super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
+        }
+    }
+
+    public static class ConnectFailure extends Failure {
+        public ConnectFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed connecting to EndPoint " + ep, cause);
+        }
+    }
+
+    public static class TxnBatchFailure extends Failure {
+        public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
+        }
+    }
+
+    public static class TxnFailure extends Failure {
+        public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
+            super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
new file mode 100644
index 0000000..6050aa8
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.hive.hcatalog.streaming.*;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HiveState implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveState.class);
+    private HiveOptions options;
+    private Integer currentBatchSize;
+    private ExecutorService callTimeoutPool;
+    private transient Timer heartBeatTimer;
+    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+    private UserGroupInformation ugi = null;
+    private Boolean kerberosEnabled = false;
+    HashMap<HiveEndPoint, HiveWriter> allWriters;
+
+    public HiveState(HiveOptions options) {
+        this.options = options;
+        this.currentBatchSize = 0;
+    }
+
+
+    @Override
+    public void beginCommit(Long txId) {
+    }
+
+    @Override
+    public void commit(Long txId) {
+    }
+
+    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)  {
+        try {
+            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) {
+                kerberosEnabled = false;
+            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) {
+                kerberosEnabled = true;
+            } else {
+                throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " +
+                                                   " & KerberosKeytab");
+            }
+
+            if (kerberosEnabled) {
+                try {
+                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
+                } catch(HiveUtils.AuthenticationFailed ex) {
+                    LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
+                    throw new IllegalArgumentException(ex);
+                }
+            }
+
+            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
+            String timeoutName = "hive-bolt-%d";
+            this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                                                                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            heartBeatTimer= new Timer();
+            setupHeartBeatTimer();
+        } catch(Exception e) {
+            LOG.warn("unable to make connection to hive ",e);
+        }
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        try {
+            writeTuples(tuples);
+        } catch (Exception e) {
+            abortAndCloseWriters();
+            LOG.warn("hive streaming failed.",e);
+            throw new FailedException(e);
+        }
+    }
+
+    private void writeTuples(List<TridentTuple> tuples)
+        throws Exception {
+        if(timeToSendHeartBeat.compareAndSet(true, false)) {
+            enableHeartBeatOnAllWriters();
+        }
+        for (TridentTuple tuple : tuples) {
+            List<String> partitionVals = options.getMapper().mapPartitions(tuple);
+            HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
+            HiveWriter writer = getOrCreateWriter(endPoint);
+            writer.write(options.getMapper().mapRecord(tuple));
+            currentBatchSize++;
+            if(currentBatchSize >= options.getBatchSize()) {
+                flushAllWriters();
+                currentBatchSize = 0;
+            }
+        }
+    }
+
+    private void abortAndCloseWriters() {
+        try {
+            abortAllWriters();
+            closeAllWriters();
+        } catch(InterruptedException e) {
+            LOG.warn("unable to close hive connections. ", e);
+        } catch(IOException ie) {
+            LOG.warn("unable to close hive connections. ", ie);
+        }
+    }
+
+    /**
+     * Abort current Txn on all writers
+     * @return number of writers retired
+     */
+    private void abortAllWriters() throws InterruptedException {
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            entry.getValue().abort();
+        }
+    }
+
+
+    /**
+     * Closes all writers and remove them from cache
+     * @return number of writers retired
+     */
+    private void closeAllWriters() throws InterruptedException, IOException {
+        //1) Retire writers
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            entry.getValue().close();
+        }
+        //2) Clear cache
+        allWriters.clear();
+    }
+
+    private void setupHeartBeatTimer() {
+        if(options.getHeartBeatInterval()>0) {
+            heartBeatTimer.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        timeToSendHeartBeat.set(true);
+                        setupHeartBeatTimer();
+                    }
+                }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void flushAllWriters()
+        throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for(HiveWriter writer: allWriters.values()) {
+            writer.flush(true);
+        }
+    }
+
+    private void enableHeartBeatOnAllWriters() {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.setHeartBeatNeeded();
+        }
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        try {
+            HiveWriter writer = allWriters.get( endPoint );
+            if( writer == null ) {
+                LOG.info("Creating Writer to Hive end point : " + endPoint);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if(allWriters.size() > options.getMaxOpenConnections()){
+                    int retired = retireIdleWriters();
+                    if(retired==0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+
+    }
+
+
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            LOG.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
+
+        //1) Find retirement candidates
+        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for(HiveEndPoint ep : retirees) {
+            try {
+                LOG.info("Closing idle Writer to Hive end point : {}", ep);
+                allWriters.remove(ep).close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+        return count;
+    }
+
+    public void cleanup() {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                LOG.info("Flushing writer to {}", w);
+                w.flush(false);
+                LOG.info("Closing writer to {}", w);
+                w.close();
+            } catch (Exception ex) {
+                LOG.warn("Error while closing writer to " + entry.getKey() +
+                         ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        ExecutorService toShutdown[] = {callTimeoutPool};
+        for (ExecutorService execService : toShutdown) {
+            execService.shutdown();
+            try {
+                while (!execService.isTerminated()) {
+                    execService.awaitTermination(
+                                                 options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException ex) {
+                LOG.warn("shutdown interrupted on " + execService, ex);
+            }
+        }
+        callTimeoutPool = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
new file mode 100644
index 0000000..8f3b9e9
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -0,0 +1,31 @@
+package org.apache.storm.hive.trident;
+
+import backtype.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+
+
+public class HiveStateFactory implements StateFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveStateFactory.class);
+    private HiveOptions options;
+
+    public HiveStateFactory(){}
+
+    public HiveStateFactory withOptions(HiveOptions options){
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+        HiveState state = new HiveState(this.options);
+        state.prepare(conf, metrics, partitionIndex, numPartitions);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
new file mode 100644
index 0000000..b0b32f1
--- /dev/null
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
@@ -0,0 +1,14 @@
+package org.apache.storm.hive.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class HiveUpdater extends BaseStateUpdater<HiveState>{
+    @Override
+    public void updateState(HiveState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/01ab7b14/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
new file mode 100644
index 0000000..d492819
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.thrift.TException;
+
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveSetupUtil {
+    public static class RawFileSystem extends RawLocalFileSystem {
+        private static final URI NAME;
+        static {
+            try {
+                NAME = new URI("raw:///");
+            } catch (URISyntaxException se) {
+                throw new IllegalArgumentException("bad uri", se);
+            }
+        }
+
+        @Override
+        public URI getUri() {
+            return NAME;
+        }
+
+        @Override
+        public FileStatus getFileStatus(Path path) throws IOException {
+            File file = pathToFile(path);
+            if (!file.exists()) {
+                throw new FileNotFoundException("Can't find " + path);
+            }
+            // get close enough
+            short mod = 0;
+            if (file.canRead()) {
+                mod |= 0444;
+            }
+            if (file.canWrite()) {
+                mod |= 0200;
+            }
+            if (file.canExecute()) {
+                mod |= 0111;
+            }
+            ShimLoader.getHadoopShims();
+            return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+                                  file.lastModified(), file.lastModified(),
+                                  FsPermission.createImmutable(mod), "owen", "users", path);
+        }
+    }
+
+    private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+    public static HiveConf getHiveConf() {
+        HiveConf conf = new HiveConf();
+        // String metastoreDBLocation = "jdbc:derby:databaseName=/tmp/metastore_db;create=true";
+        // conf.set("javax.jdo.option.ConnectionDriverName","org.apache.derby.jdbc.EmbeddedDriver");
+        // conf.set("javax.jdo.option.ConnectionURL",metastoreDBLocation);
+        conf.set("fs.raw.impl", RawFileSystem.class.getName());
+        conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
+        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+        return conf;
+    }
+
+    public static void createDbAndTable(HiveConf conf, String databaseName,
+                                        String tableName, List<String> partVals,
+                                        String[] colNames, String[] colTypes,
+                                        String[] partNames, String dbLocation)
+        throws Exception {
+        IMetaStoreClient client = new HiveMetaStoreClient(conf);
+        try {
+            Database db = new Database();
+            db.setName(databaseName);
+            db.setLocationUri(dbLocation);
+            client.createDatabase(db);
+
+            Table tbl = new Table();
+            tbl.setDbName(databaseName);
+            tbl.setTableName(tableName);
+            tbl.setTableType(TableType.MANAGED_TABLE.toString());
+            StorageDescriptor sd = new StorageDescriptor();
+            sd.setCols(getTableColumns(colNames, colTypes));
+            sd.setNumBuckets(1);
+            sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
+            if(partNames!=null && partNames.length!=0) {
+                tbl.setPartitionKeys(getPartitionKeys(partNames));
+            }
+
+            tbl.setSd(sd);
+
+            sd.setBucketCols(new ArrayList<String>(2));
+            sd.setSerdeInfo(new SerDeInfo());
+            sd.getSerdeInfo().setName(tbl.getTableName());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+
+            sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
+            sd.setInputFormat(OrcInputFormat.class.getName());
+            sd.setOutputFormat(OrcOutputFormat.class.getName());
+
+            Map<String, String> tableParams = new HashMap<String, String>();
+            tbl.setParameters(tableParams);
+            client.createTable(tbl);
+            try {
+                if(partVals!=null && partVals.size() > 0) {
+                    addPartition(client, tbl, partVals);
+                }
+            } catch(AlreadyExistsException e) {
+            }
+        } finally {
+            client.close();
+        }
+    }
+
+    // delete db and all tables in it
+    public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException {
+        IMetaStoreClient client = new HiveMetaStoreClient(conf);
+        try {
+            for (String table : client.listTableNamesByFilter(databaseName, "", (short) -1)) {
+                client.dropTable(databaseName, table, true, true);
+            }
+            client.dropDatabase(databaseName);
+        } catch (TException e) {
+            client.close();
+        }
+    }
+
+    private static void addPartition(IMetaStoreClient client, Table tbl
+                                     , List<String> partValues)
+        throws IOException, TException {
+        Partition part = new Partition();
+        part.setDbName(tbl.getDbName());
+        part.setTableName(tbl.getTableName());
+        StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
+        sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys(), partValues));
+        part.setSd(sd);
+        part.setValues(partValues);
+        client.add_partition(part);
+    }
+
+    private static String makePartPath(List<FieldSchema> partKeys, List<String> partVals) {
+        if(partKeys.size()!=partVals.size()) {
+            throw new IllegalArgumentException("Partition values:" + partVals +
+                                               ", does not match the partition Keys in table :" + partKeys );
+        }
+        StringBuffer buff = new StringBuffer(partKeys.size()*20);
+        int i=0;
+        for(FieldSchema schema : partKeys) {
+            buff.append(schema.getName());
+            buff.append("=");
+            buff.append(partVals.get(i));
+            if(i!=partKeys.size()-1) {
+                buff.append(Path.SEPARATOR);
+            }
+            ++i;
+        }
+        return buff.toString();
+    }
+
+    private static List<FieldSchema> getTableColumns(String[] colNames, String[] colTypes) {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for (int i=0; i<colNames.length; ++i) {
+            fields.add(new FieldSchema(colNames[i], colTypes[i], ""));
+        }
+        return fields;
+    }
+
+    private static List<FieldSchema> getPartitionKeys(String[] partNames) {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for (int i=0; i < partNames.length; ++i) {
+           fields.add(new FieldSchema(partNames[i], serdeConstants.STRING_TYPE_NAME, ""));
+        }
+        return fields;
+    }
+
+}


[06/13] storm git commit: STORM-539. Storm hive bolt and trident state. Hive version to 0.14 and update README.

Posted by sr...@apache.org.
STORM-539. Storm hive bolt and trident state. Hive version to 0.14 and
update README.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e85b79a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e85b79a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e85b79a9

Branch: refs/heads/master
Commit: e85b79a9b1d08d8a9bc534bb96be84b25a1ee831
Parents: 81772b2
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Dec 15 15:23:30 2014 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 12 08:53:44 2015 -0800

----------------------------------------------------------------------
 external/storm-hive/README.md | 10 +++++-----
 pom.xml                       |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e85b79a9/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
index 6461462..aec0a89 100644
--- a/external/storm-hive/README.md
+++ b/external/storm-hive/README.md
@@ -13,7 +13,7 @@
   ```
   
 
-## HiveBolt
+## HiveBolt (org.apache.storm.hive.bolt.HiveBolt)
 
 HiveBolt streams tuples directly into hive. Tuples are written using Hive Transactions. 
 Partiions to which HiveBolt will stream to can either created or pre-created or optionally
@@ -32,8 +32,8 @@ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
    There are two implementaitons available
  
    
-   1) DelimitedRecordHiveMapper
-   2) JsonRecordHiveMapper
+   + DelimitedRecordHiveMapper (org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper)
+   + JsonRecordHiveMapper (org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper)
    
    ```java
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
@@ -51,7 +51,7 @@ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
 |withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
 |withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
 
-### HiveOptions
+### HiveOptions (org.apache.storm.hive.common.HiveOptions)
   
 HiveBolt takes in HiveOptions as a constructor arg.
 
@@ -82,7 +82,7 @@ HiveOptions params
 
 
  
-## HiveState
+## HiveState (org.apache.storm.hive.trident.HiveTrident)
 
 Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e85b79a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6265d9c..f5757ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
         <zookeeper.version>3.4.6</zookeeper.version>
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
-        <hive.version>0.13.0</hive.version>
+        <hive.version>0.14.0</hive.version>
         <hadoop.version>2.6.0</hadoop.version>
     </properties>
 


[12/13] storm git commit: Merge branch 'STORM-539-V2' of https://github.com/harshach/incubator-storm into STORM-539

Posted by sr...@apache.org.
Merge branch 'STORM-539-V2' of https://github.com/harshach/incubator-storm into STORM-539


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59eb7f95
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59eb7f95
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59eb7f95

Branch: refs/heads/master
Commit: 59eb7f95a243cf23b7cbbbec843f7bdb7fac37f8
Parents: 0a3a0aa e62c163
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Feb 25 08:16:11 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Feb 25 08:16:11 2015 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +-
 external/storm-hive/README.md                   | 113 +++++
 external/storm-hive/pom.xml                     | 143 +++++++
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 289 +++++++++++++
 .../bolt/mapper/DelimitedRecordHiveMapper.java  | 143 +++++++
 .../storm/hive/bolt/mapper/HiveMapper.java      |  81 ++++
 .../hive/bolt/mapper/JsonRecordHiveMapper.java  | 132 ++++++
 .../apache/storm/hive/common/HiveOptions.java   | 146 +++++++
 .../org/apache/storm/hive/common/HiveUtils.java |  76 ++++
 .../apache/storm/hive/common/HiveWriter.java    | 420 +++++++++++++++++++
 .../apache/storm/hive/trident/HiveState.java    | 306 ++++++++++++++
 .../storm/hive/trident/HiveStateFactory.java    |  31 ++
 .../apache/storm/hive/trident/HiveUpdater.java  |  14 +
 .../apache/storm/hive/bolt/HiveSetupUtil.java   | 220 ++++++++++
 .../apache/storm/hive/bolt/HiveTopology.java    | 150 +++++++
 .../hive/bolt/HiveTopologyPartitioned.java      | 153 +++++++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 330 +++++++++++++++
 .../storm/hive/common/TestHiveWriter.java       | 193 +++++++++
 .../storm/hive/trident/TridentHiveTopology.java | 190 +++++++++
 pom.xml                                         |   4 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 21 files changed, 3150 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[09/13] storm git commit: STORM-539. Storm hive bolt and trident state. Added Sponsors to README.

Posted by sr...@apache.org.
STORM-539. Storm hive bolt and trident state. Added Sponsors to README.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54b6a69f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54b6a69f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54b6a69f

Branch: refs/heads/master
Commit: 54b6a69f044c7a93b7871e6237d33fc612cc90f7
Parents: b0cbb49
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 12 20:07:33 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 12 20:07:33 2015 -0800

----------------------------------------------------------------------
 external/storm-hive/README.md | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/54b6a69f/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
index aec0a89..eddb4a9 100644
--- a/external/storm-hive/README.md
+++ b/external/storm-hive/README.md
@@ -101,7 +101,9 @@ Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOpti
  ```
    
  
- 
+## Committer Sponsors
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * Bobby Evans ([bobby@apache.org](mailto:bobby@apache.org))
 
 
 


[13/13] storm git commit: Added STORM-539 to Changelog.

Posted by sr...@apache.org.
Added STORM-539 to Changelog.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14a302f5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14a302f5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14a302f5

Branch: refs/heads/master
Commit: 14a302f545ef3ef95aac5177bc9ac441626377b4
Parents: 59eb7f9
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Feb 25 10:35:41 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Feb 25 10:35:41 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/14a302f5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6f774de..e988c61 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-539: Storm Hive Connector.
  * STORM-616: Storm JDBC Connector.
  * STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages (thanks tedxia)
  * STORM-641: Add total number of topologies to api/v1/cluster/summary.


[03/13] storm git commit: STORM-539. Storm hive bolt and trident state. Hive version to 0.14 and update README.

Posted by sr...@apache.org.
STORM-539. Storm hive bolt and trident state. Hive version to 0.14 and
update README.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dfb8e370
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dfb8e370
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dfb8e370

Branch: refs/heads/master
Commit: dfb8e3709d27691a2f97bbc3e49491f13a9769d1
Parents: 01ab7b1
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Dec 15 15:23:30 2014 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Mon Dec 15 15:23:30 2014 -0800

----------------------------------------------------------------------
 external/storm-hive/README.md | 10 +++++-----
 pom.xml                       |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dfb8e370/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
index 6461462..aec0a89 100644
--- a/external/storm-hive/README.md
+++ b/external/storm-hive/README.md
@@ -13,7 +13,7 @@
   ```
   
 
-## HiveBolt
+## HiveBolt (org.apache.storm.hive.bolt.HiveBolt)
 
 HiveBolt streams tuples directly into hive. Tuples are written using Hive Transactions. 
 Partiions to which HiveBolt will stream to can either created or pre-created or optionally
@@ -32,8 +32,8 @@ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
    There are two implementaitons available
  
    
-   1) DelimitedRecordHiveMapper
-   2) JsonRecordHiveMapper
+   + DelimitedRecordHiveMapper (org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper)
+   + JsonRecordHiveMapper (org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper)
    
    ```java
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
@@ -51,7 +51,7 @@ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
 |withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
 |withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
 
-### HiveOptions
+### HiveOptions (org.apache.storm.hive.common.HiveOptions)
   
 HiveBolt takes in HiveOptions as a constructor arg.
 
@@ -82,7 +82,7 @@ HiveOptions params
 
 
  
-## HiveState
+## HiveState (org.apache.storm.hive.trident.HiveTrident)
 
 Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dfb8e370/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 603f772..2f02860 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
         <conjure.version>2.1.3</conjure.version>
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
-        <hive.version>0.13.0</hive.version>
+        <hive.version>0.14.0</hive.version>
         <hadoop.version>2.6.0</hadoop.version>
     </properties>