You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/25 19:52:52 UTC

[04/13] storm git commit: STORM-539. Storm hive bolt and trident state.

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
new file mode 100644
index 0000000..e9ecbd0
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopology {
+    static final String USER_SPOUT_ID = "user-spout";
+    static final String BOLT_ID = "my-hive-bolt";
+    static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String[] colNames = {"id","name","phone","street","city","state"};
+        Config config = new Config();
+        config.setNumWorkers(1);
+        UserDataSpout spout = new UserDataSpout();
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+        HiveOptions hiveOptions;
+        if (args.length == 6) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(100)
+                .withIdleTimeout(10)
+                .withKerberosKeytab(args[4])
+                .withKerberosPrincipal(args[5]);
+        } else {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(100)
+                .withIdleTimeout(10);
+        }
+
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 1)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length >= 4) {
+            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "1,user1,123456,street1,sunnyvale,ca",
+                "2,user2,123456,street2,sunnyvale,ca",
+                "3,user3,123456,street3,san jose,ca",
+                "4,user4,123456,street4,san jose,ca",
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id","name","phone","street","city","state"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String[] user = sentences[index].split(",");
+            Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 1000){
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
new file mode 100644
index 0000000..c3197c2
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopologyPartitioned {
+    static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
+    static final String BOLT_ID = "my-hive-bolt-partitioned";
+    static final String TOPOLOGY_NAME = "hive-test-topology-partitioned";
+
+    public static void main(String[] args) throws Exception {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        String[] partNames = {"city","state"};
+        String[] colNames = {"id","name","phone","street"};
+        Config config = new Config();
+        config.setNumWorkers(1);
+        UserDataSpout spout = new UserDataSpout();
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions;
+        if (args.length == 6) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(1000)
+                .withIdleTimeout(10)
+                .withKerberosKeytab(args[4])
+                .withKerberosPrincipal(args[5]);
+        } else {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(1000)
+                .withIdleTimeout(10);
+        }
+
+        HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(USER_SPOUT_ID, spout, 1);
+        // SentenceSpout --> MyBolt
+        builder.setBolt(BOLT_ID, hiveBolt, 1)
+                .shuffleGrouping(USER_SPOUT_ID);
+        if (args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+            waitForSeconds(20);
+            cluster.killTopology(TOPOLOGY_NAME);
+            System.out.println("cluster begin to shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length >= 4) {
+            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        }
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sentences = {
+                "1,user1,123456,street1,sunnyvale,ca",
+                "2,user2,123456,street2,sunnyvale,ca",
+                "3,user3,123456,street3,san jose,ca",
+                "4,user4,123456,street4,san jose,ca",
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id","name","phone","street","city","state"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String[] user = sentences[index].split(",");
+            Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sentences.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if(count > 1000){
+		Utils.sleep(1000);
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
new file mode 100644
index 0000000..e7e875e
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import junit.framework.Assert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+
+import org.apache.hive.hcatalog.streaming.*;
+
+public class TestHiveBolt {
+    final static String dbName = "testdb";
+    final static String tblName = "test_table";
+    final static String dbName1 = "testdb1";
+    final static String tblName1 = "test_table1";
+    final static String PART1_NAME = "city";
+    final static String PART2_NAME = "state";
+    final static String[] partNames = { PART1_NAME, PART2_NAME };
+    final String partitionVals = "sunnyvale,ca";
+    private static final String COL1 = "id";
+    private static final String COL2 = "msg";
+    final String[] colNames = {COL1,COL2};
+    final String[] colNames1 = {COL2,COL1};
+    private String[] colTypes = {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+    private final HiveConf conf;
+    private final Driver driver;
+    private final int port ;
+    final String metaStoreURI;
+    private String dbLocation;
+    private Config config = new Config();
+    private HiveBolt bolt;
+    private final static boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+    @Rule
+    public TemporaryFolder dbFolder = new TemporaryFolder();
+
+    @Mock
+    private IOutputCollector collector;
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+
+    public TestHiveBolt() throws Exception {
+        port=9083;
+        dbLocation = new String();
+        //metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true";
+        metaStoreURI = null;
+        conf = HiveSetupUtil.getHiveConf();
+        TxnDbUtil.setConfValues(conf);
+        TxnDbUtil.cleanDb();
+        TxnDbUtil.prepDb();
+        SessionState.start(new CliSessionState(conf));
+        driver = new Driver(conf);
+
+        // driver.init();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        HiveSetupUtil.dropDB(conf, dbName);
+        if(WINDOWS) {
+            dbLocation = dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+        } else {
+            dbLocation = "raw://" + dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+        }
+        HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals.split(",")),
+                colNames, colTypes, partNames, dbLocation);
+        System.out.println("done");
+    }
+
+    @Test
+    public void testEndpointConnection() throws Exception {
+        // 1) Basic
+        HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+                                              , Arrays.asList(partitionVals.split(",")));
+        StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+        connection.close();
+        // 2) Leave partition unspecified
+        endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+        endPt.newConnection(false, null).close(); // should not throw
+    }
+
+    @Test
+    public void testWithByteArrayIdandMessage()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        String city = "sunnyvale";
+        String state = "ca";
+        checkRecordCountInTable(tblName,dbName,0);
+        for (int i=0; i < 4; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkRecordCountInTable(tblName, dbName, 4);
+        bolt.cleanup();
+    }
+
+
+    @Test
+    public void testWithoutPartitions()
+        throws Exception {
+        HiveSetupUtil.dropDB(conf,dbName1);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+                                       colNames,colTypes,null, dbLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(2)
+            .withAutoCreatePartitions(false);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        String city = "sunnyvale";
+        String state = "ca";
+        checkRecordCountInTable(tblName1,dbName1,0);
+        for (int i=0; i < 4; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        bolt.cleanup();
+        checkRecordCountInTable(tblName1, dbName1, 4);
+    }
+
+    @Test
+    public void testWithTimeformat()
+        throws Exception {
+        String[] partNames1 = {"date"};
+        String timeFormat = "yyyy/MM/dd";
+        HiveSetupUtil.dropDB(conf,dbName1);
+        HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+                                       colNames,colTypes,partNames1, dbLocation);
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField(timeFormat);
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 100;
+        String msg = "test-123";
+        Date d = new Date();
+        SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
+        String today=parseDate.format(d.getTime());
+        checkRecordCountInTable(tblName1,dbName1,0);
+        for (int i=0; i < 2; i++) {
+            Tuple tuple = generateTestTuple(id,msg,null,null);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkDataWritten(tblName1, dbName1, "100,test-123,"+today, "100,test-123,"+today);
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testData()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+        bolt.execute(tuple1);
+        verify(collector).ack(tuple1);
+        //bolt.execute(tuple2);
+        //verify(collector).ack(tuple2);
+        checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testJsonWriter()
+        throws Exception {
+        // json record doesn't need columns to be in the same order
+        // as table in hive.
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+        //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+        bolt.execute(tuple1);
+        verify(collector).ack(tuple1);
+        //bolt.execute(tuple2);
+        //verify(collector).ack(tuple2);
+        checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+        bolt.cleanup();
+    }
+
+
+    @Test
+    public void testMultiPartitionTuples()
+        throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+            .withTxnsPerBatch(10)
+            .withBatchSize(10);
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config,null,new OutputCollector(collector));
+        Integer id = 1;
+        String msg = "test";
+        String city = "San Jose";
+        String state = "CA";
+        checkRecordCountInTable(tblName,dbName,0);
+        for(int i=0; i < 100; i++) {
+            Tuple tuple = generateTestTuple(id,msg,city,state);
+            bolt.execute(tuple);
+            verify(collector).ack(tuple);
+        }
+        checkRecordCountInTable(tblName, dbName, 100);
+        bolt.cleanup();
+    }
+
+    private void checkRecordCountInTable(String tableName,String dbName,int expectedCount)
+        throws CommandNeedRetryException, IOException {
+        int count = listRecordsInTable(tableName,dbName).size();
+        Assert.assertEquals(expectedCount, count);
+    }
+
+    private  ArrayList<String> listRecordsInTable(String tableName,String dbName)
+        throws CommandNeedRetryException, IOException {
+        driver.compile("select * from " + dbName + "." + tableName);
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        return res;
+    }
+
+    private void checkDataWritten(String tableName,String dbName,String... row)
+        throws CommandNeedRetryException, IOException {
+        ArrayList<String> results = listRecordsInTable(tableName,dbName);
+        for(int i = 0; i < row.length && results.size() > 0; i++) {
+            String resultRow = results.get(i).replace("\t",",");
+            System.out.println(resultRow);
+            assertEquals(row[i],resultRow);
+        }
+    }
+
+    private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                                                                             new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, String streamId) {
+                    return new Fields("id", "msg","city","state");
+                }
+            };
+        return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
new file mode 100644
index 0000000..63b1949
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import junit.framework.Assert;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.storm.hive.bolt.HiveSetupUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.HashMap;
+
+public class TestHiveWriter {
+    final static String dbName = "testdb";
+    final static String tblName = "test_table2";
+
+    public static final String PART1_NAME = "city";
+    public static final String PART2_NAME = "state";
+    public static final String[] partNames = { PART1_NAME, PART2_NAME };
+    final String[] partitionVals = {"sunnyvale","ca"};
+    final String[] colNames = {"id","msg"};
+    private String[] colTypes = { "int", "string" };
+    private final int port;
+    private final String metaStoreURI;
+    private final HiveConf conf;
+    private ExecutorService callTimeoutPool;
+    private final Driver driver;
+    int timeout = 10000; // msec
+    UserGroupInformation ugi = null;
+
+    @Rule
+    public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+    public TestHiveWriter() throws Exception {
+        port = 9083;
+        metaStoreURI = null;
+        int callTimeoutPoolSize = 1;
+        callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
+                                                       new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+
+        // 1) Start metastore
+        conf = HiveSetupUtil.getHiveConf();
+        TxnDbUtil.setConfValues(conf);
+        TxnDbUtil.cleanDb();
+        TxnDbUtil.prepDb();
+
+        if(metaStoreURI!=null) {
+            conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+        }
+        SessionState.start(new CliSessionState(conf));
+        driver = new Driver(conf);
+        driver.init();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // 1) Setup tables
+        HiveSetupUtil.dropDB(conf, dbName);
+        String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+        HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals),
+                                       colNames,colTypes, partNames, dbLocation);
+    }
+
+    @Test
+    public void testInstantiate() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           ,callTimeoutPool, mapper, ugi);
+        writer.close();
+    }
+
+    @Test
+    public void testWriteBasic() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           , callTimeoutPool, mapper, ugi);
+        writeTuples(writer,mapper,3);
+        writer.flush(false);
+        writer.close();
+        checkRecordCountInTable(dbName,tblName,3);
+    }
+
+    @Test
+    public void testWriteMultiFlush() throws Exception {
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+
+        HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+        HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+                                           , callTimeoutPool, mapper, ugi);
+        Tuple tuple = generateTestTuple("1","abc");
+        writer.write(mapper.mapRecord(tuple));
+        checkRecordCountInTable(dbName,tblName,0);
+        writer.flush(true);
+
+        tuple = generateTestTuple("2","def");
+        writer.write(mapper.mapRecord(tuple));
+        writer.flush(true);
+
+        tuple = generateTestTuple("3","ghi");
+        writer.write(mapper.mapRecord(tuple));
+        writer.flush(true);
+        writer.close();
+        checkRecordCountInTable(dbName,tblName,3);
+    }
+
+    private Tuple generateTestTuple(Object id, Object msg) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                                                              new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, String streamId) {
+                    return new Fields("id", "msg");
+                }
+            };
+        return new TupleImpl(topologyContext, new Values(id, msg), 1, "");
+    }
+
+    private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
+            throws HiveWriter.WriteFailure, InterruptedException {
+        Integer id = 100;
+        String msg = "test-123";
+        for (int i = 1; i <= count; i++) {
+            Tuple tuple = generateTestTuple(id,msg);
+            writer.write(mapper.mapRecord(tuple));
+        }
+    }
+
+    private void checkRecordCountInTable(String dbName,String tableName,int expectedCount)
+        throws CommandNeedRetryException, IOException {
+        int count = listRecordsInTable(dbName,tableName).size();
+        Assert.assertEquals(expectedCount, count);
+    }
+
+    private  ArrayList<String> listRecordsInTable(String dbName,String tableName)
+        throws CommandNeedRetryException, IOException {
+        driver.compile("select * from " + dbName + "." + tableName);
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
new file mode 100644
index 0000000..bc607f3
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.trident;
+
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.task.TopologyContext;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+
+
+public class TridentHiveTopology {
+    public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) {
+        int batchSize = 100;
+        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+        spout.setCycle(true);
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("hiveTridentspout1",spout);
+        String[] partNames = {"city","state"};
+        String[] colNames = {"id","name","phone","street"};
+        Fields hiveFields = new Fields("id","name","phone","street","city","state");
+        DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions;
+        if (keytab != null && principal != null) {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(batchSize)
+                .withIdleTimeout(10)
+                .withCallTimeout(30000)
+                .withKerberosKeytab((String)keytab)
+                .withKerberosPrincipal((String)principal);
+        } else  {
+            hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(10)
+                .withBatchSize(batchSize)
+                .withCallTimeout(30000)
+                .withIdleTimeout(10);
+        }
+        StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+        TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+        return topology.build();
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public static void main(String[] args) {
+        String metaStoreURI = args[0];
+        String dbName = args[1];
+        String tblName = args[2];
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if(args.length == 3) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+            System.out.println("waiting for 60 seconds");
+            waitForSeconds(60);
+            System.out.println("killing topology");
+            cluster.killTopology("tridenHiveTopology");
+            System.out.println("cluster shutdown");
+            cluster.shutdown();
+            System.out.println("cluster shutdown");
+            System.exit(0);
+        } else if(args.length == 4) {
+            try {
+                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+            } catch(Exception e) {
+                System.out.println("Failed to submit topology "+e);
+            }
+        } else if (args.length == 6) {
+            try {
+                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
+            } catch(Exception e) {
+                System.out.println("Failed to submit topology "+e);
+            }
+        } else {
+            System.out.println("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
+        }
+    }
+
+    public static class FixedBatchSpout implements IBatchSpout {
+        int maxBatchSize;
+        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+        private Values[] outputs = {
+            new Values("1","user1","123456","street1","sunnyvale","ca"),
+            new Values("2","user2","123456","street2","sunnyvale","ca"),
+            new Values("3","user3","123456","street3","san jose","ca"),
+            new Values("4","user4","123456","street4","san jose","ca"),
+        };
+        private int index = 0;
+        boolean cycle = false;
+
+        public FixedBatchSpout(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        public void setCycle(boolean cycle) {
+            this.cycle = cycle;
+        }
+
+        @Override
+        public Fields getOutputFields() {
+            return new Fields("id","name","phone","street","city","state");
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context) {
+            index = 0;
+        }
+
+        @Override
+        public void emitBatch(long batchId, TridentCollector collector) {
+            List<List<Object>> batch = this.batches.get(batchId);
+            if(batch == null){
+                batch = new ArrayList<List<Object>>();
+                if(index>=outputs.length && cycle) {
+                    index = 0;
+                }
+                for(int i=0; i < maxBatchSize; index++, i++) {
+                    if(index == outputs.length){
+                        index=0;
+                    }
+                    batch.add(outputs[index]);
+                }
+                this.batches.put(batchId, batch);
+            }
+            for(List<Object> list : batch){
+                collector.emit(list);
+            }
+        }
+
+        @Override
+        public void ack(long batchId) {
+            this.batches.remove(batchId);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Map getComponentConfiguration() {
+            Config conf = new Config();
+            conf.setMaxTaskParallelism(1);
+            return conf;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af86bd0..6265d9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hbase</module>
+        <module>external/storm-hive</module>
     </modules>
 
     <scm>
@@ -212,7 +213,8 @@
         <zookeeper.version>3.4.6</zookeeper.version>
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
-
+        <hive.version>0.13.0</hive.version>
+        <hadoop.version>2.6.0</hadoop.version>
     </properties>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index ebff853..51cfd14 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -103,6 +103,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive/target</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-hive</directory>
+            <outputDirectory>external/storm-hive</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
 
     </fileSets>