You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/25 19:52:52 UTC
[04/13] storm git commit: STORM-539. Storm hive bolt and trident
state.
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
new file mode 100644
index 0000000..e9ecbd0
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopology {
+ static final String USER_SPOUT_ID = "user-spout";
+ static final String BOLT_ID = "my-hive-bolt";
+ static final String TOPOLOGY_NAME = "hive-test-topology1";
+
+ public static void main(String[] args) throws Exception {
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ String[] colNames = {"id","name","phone","street","city","state"};
+ Config config = new Config();
+ config.setNumWorkers(1);
+ UserDataSpout spout = new UserDataSpout();
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames));
+ HiveOptions hiveOptions;
+ if (args.length == 6) {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(100)
+ .withIdleTimeout(10)
+ .withKerberosKeytab(args[4])
+ .withKerberosPrincipal(args[5]);
+ } else {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(100)
+ .withIdleTimeout(10);
+ }
+
+ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(USER_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, hiveBolt, 1)
+ .shuffleGrouping(USER_SPOUT_ID);
+ if (args.length == 3) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else if(args.length >= 4) {
+ StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sentences = {
+ "1,user1,123456,street1,sunnyvale,ca",
+ "2,user2,123456,street2,sunnyvale,ca",
+ "3,user3,123456,street3,san jose,ca",
+ "4,user4,123456,street4,san jose,ca",
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id","name","phone","street","city","state"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ String[] user = sentences[index].split(",");
+ Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sentences.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if(count > 1000){
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ Thread.yield();
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
new file mode 100644
index 0000000..c3197c2
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class HiveTopologyPartitioned {
+ static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
+ static final String BOLT_ID = "my-hive-bolt-partitioned";
+ static final String TOPOLOGY_NAME = "hive-test-topology-partitioned";
+
+ public static void main(String[] args) throws Exception {
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ String[] partNames = {"city","state"};
+ String[] colNames = {"id","name","phone","street"};
+ Config config = new Config();
+ config.setNumWorkers(1);
+ UserDataSpout spout = new UserDataSpout();
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions;
+ if (args.length == 6) {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(1000)
+ .withIdleTimeout(10)
+ .withKerberosKeytab(args[4])
+ .withKerberosPrincipal(args[5]);
+ } else {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(1000)
+ .withIdleTimeout(10);
+ }
+
+ HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(USER_SPOUT_ID, spout, 1);
+ // SentenceSpout --> MyBolt
+ builder.setBolt(BOLT_ID, hiveBolt, 1)
+ .shuffleGrouping(USER_SPOUT_ID);
+ if (args.length == 3) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+ waitForSeconds(20);
+ cluster.killTopology(TOPOLOGY_NAME);
+ System.out.println("cluster begin to shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else if(args.length >= 4) {
+ StormSubmitter.submitTopology(args[3], config, builder.createTopology());
+ } else {
+ System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+ }
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static class UserDataSpout extends BaseRichSpout {
+ private ConcurrentHashMap<UUID, Values> pending;
+ private SpoutOutputCollector collector;
+ private String[] sentences = {
+ "1,user1,123456,street1,sunnyvale,ca",
+ "2,user2,123456,street2,sunnyvale,ca",
+ "3,user3,123456,street3,san jose,ca",
+ "4,user4,123456,street4,san jose,ca",
+ };
+ private int index = 0;
+ private int count = 0;
+ private long total = 0L;
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id","name","phone","street","city","state"));
+ }
+
+ public void open(Map config, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.pending = new ConcurrentHashMap<UUID, Values>();
+ }
+
+ public void nextTuple() {
+ String[] user = sentences[index].split(",");
+ Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
+ UUID msgId = UUID.randomUUID();
+ this.pending.put(msgId, values);
+ this.collector.emit(values, msgId);
+ index++;
+ if (index >= sentences.length) {
+ index = 0;
+ }
+ count++;
+ total++;
+ if(count > 1000){
+ Utils.sleep(1000);
+ count = 0;
+ System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+ }
+ }
+
+ public void ack(Object msgId) {
+ this.pending.remove(msgId);
+ }
+
+ public void fail(Object msgId) {
+ System.out.println("**** RESENDING FAILED TUPLE");
+ this.collector.emit(this.pending.get(msgId), msgId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
new file mode 100644
index 0000000..e7e875e
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import junit.framework.Assert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+
+import org.apache.hive.hcatalog.streaming.*;
+
+public class TestHiveBolt {
+ final static String dbName = "testdb";
+ final static String tblName = "test_table";
+ final static String dbName1 = "testdb1";
+ final static String tblName1 = "test_table1";
+ final static String PART1_NAME = "city";
+ final static String PART2_NAME = "state";
+ final static String[] partNames = { PART1_NAME, PART2_NAME };
+ final String partitionVals = "sunnyvale,ca";
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+ final String[] colNames = {COL1,COL2};
+ final String[] colNames1 = {COL2,COL1};
+ private String[] colTypes = {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+ private final HiveConf conf;
+ private final Driver driver;
+ private final int port ;
+ final String metaStoreURI;
+ private String dbLocation;
+ private Config config = new Config();
+ private HiveBolt bolt;
+ private final static boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+ @Rule
+ public TemporaryFolder dbFolder = new TemporaryFolder();
+
+ @Mock
+ private IOutputCollector collector;
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+
+ public TestHiveBolt() throws Exception {
+ port=9083;
+ dbLocation = new String();
+ //metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true";
+ metaStoreURI = null;
+ conf = HiveSetupUtil.getHiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+ SessionState.start(new CliSessionState(conf));
+ driver = new Driver(conf);
+
+ // driver.init();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ HiveSetupUtil.dropDB(conf, dbName);
+ if(WINDOWS) {
+ dbLocation = dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+ } else {
+ dbLocation = "raw://" + dbFolder.newFolder(dbName + ".db").getCanonicalPath();
+ }
+ HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals.split(",")),
+ colNames, colTypes, partNames, dbLocation);
+ System.out.println("done");
+ }
+
+ @Test
+ public void testEndpointConnection() throws Exception {
+ // 1) Basic
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+ , Arrays.asList(partitionVals.split(",")));
+ StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+ connection.close();
+ // 2) Leave partition unspecified
+ endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ endPt.newConnection(false, null).close(); // should not throw
+ }
+
+ @Test
+ public void testWithByteArrayIdandMessage()
+ throws Exception {
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(2);
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config,null,new OutputCollector(collector));
+ Integer id = 100;
+ String msg = "test-123";
+ String city = "sunnyvale";
+ String state = "ca";
+ checkRecordCountInTable(tblName,dbName,0);
+ for (int i=0; i < 4; i++) {
+ Tuple tuple = generateTestTuple(id,msg,city,state);
+ bolt.execute(tuple);
+ verify(collector).ack(tuple);
+ }
+ checkRecordCountInTable(tblName, dbName, 4);
+ bolt.cleanup();
+ }
+
+
+ @Test
+ public void testWithoutPartitions()
+ throws Exception {
+ HiveSetupUtil.dropDB(conf,dbName1);
+ HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+ colNames,colTypes,null, dbLocation);
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(2)
+ .withAutoCreatePartitions(false);
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config,null,new OutputCollector(collector));
+ Integer id = 100;
+ String msg = "test-123";
+ String city = "sunnyvale";
+ String state = "ca";
+ checkRecordCountInTable(tblName1,dbName1,0);
+ for (int i=0; i < 4; i++) {
+ Tuple tuple = generateTestTuple(id,msg,city,state);
+ bolt.execute(tuple);
+ verify(collector).ack(tuple);
+ }
+ bolt.cleanup();
+ checkRecordCountInTable(tblName1, dbName1, 4);
+ }
+
+ @Test
+ public void testWithTimeformat()
+ throws Exception {
+ String[] partNames1 = {"date"};
+ String timeFormat = "yyyy/MM/dd";
+ HiveSetupUtil.dropDB(conf,dbName1);
+ HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1,null,
+ colNames,colTypes,partNames1, dbLocation);
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withTimeAsPartitionField(timeFormat);
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(1);
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config,null,new OutputCollector(collector));
+ Integer id = 100;
+ String msg = "test-123";
+ Date d = new Date();
+ SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
+ String today=parseDate.format(d.getTime());
+ checkRecordCountInTable(tblName1,dbName1,0);
+ for (int i=0; i < 2; i++) {
+ Tuple tuple = generateTestTuple(id,msg,null,null);
+ bolt.execute(tuple);
+ verify(collector).ack(tuple);
+ }
+ checkDataWritten(tblName1, dbName1, "100,test-123,"+today, "100,test-123,"+today);
+ bolt.cleanup();
+ }
+
+ @Test
+ public void testData()
+ throws Exception {
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(1);
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config,null,new OutputCollector(collector));
+ Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+ //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+ bolt.execute(tuple1);
+ verify(collector).ack(tuple1);
+ //bolt.execute(tuple2);
+ //verify(collector).ack(tuple2);
+ checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+ bolt.cleanup();
+ }
+
+ @Test
+ public void testJsonWriter()
+ throws Exception {
+ // json record doesn't need columns to be in the same order
+ // as table in hive.
+ JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+ .withColumnFields(new Fields(colNames1))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(2)
+ .withBatchSize(1);
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config,null,new OutputCollector(collector));
+ Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
+ //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
+ bolt.execute(tuple1);
+ verify(collector).ack(tuple1);
+ //bolt.execute(tuple2);
+ //verify(collector).ack(tuple2);
+ checkDataWritten(tblName, dbName, "1,SJC,Sunnyvale,CA");
+ bolt.cleanup();
+ }
+
+
+ @Test
+ public void testMultiPartitionTuples()
+ throws Exception {
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(10);
+ bolt = new HiveBolt(hiveOptions);
+ bolt.prepare(config,null,new OutputCollector(collector));
+ Integer id = 1;
+ String msg = "test";
+ String city = "San Jose";
+ String state = "CA";
+ checkRecordCountInTable(tblName,dbName,0);
+ for(int i=0; i < 100; i++) {
+ Tuple tuple = generateTestTuple(id,msg,city,state);
+ bolt.execute(tuple);
+ verify(collector).ack(tuple);
+ }
+ checkRecordCountInTable(tblName, dbName, 100);
+ bolt.cleanup();
+ }
+
+ private void checkRecordCountInTable(String tableName,String dbName,int expectedCount)
+ throws CommandNeedRetryException, IOException {
+ int count = listRecordsInTable(tableName,dbName).size();
+ Assert.assertEquals(expectedCount, count);
+ }
+
+ private ArrayList<String> listRecordsInTable(String tableName,String dbName)
+ throws CommandNeedRetryException, IOException {
+ driver.compile("select * from " + dbName + "." + tableName);
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ return res;
+ }
+
+ private void checkDataWritten(String tableName,String dbName,String... row)
+ throws CommandNeedRetryException, IOException {
+ ArrayList<String> results = listRecordsInTable(tableName,dbName);
+ for(int i = 0; i < row.length && results.size() > 0; i++) {
+ String resultRow = results.get(i).replace("\t",",");
+ System.out.println(resultRow);
+ assertEquals(row[i],resultRow);
+ }
+ }
+
+ private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) {
+ TopologyBuilder builder = new TopologyBuilder();
+ GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+ new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+ @Override
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ return new Fields("id", "msg","city","state");
+ }
+ };
+ return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
new file mode 100644
index 0000000..63b1949
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import junit.framework.Assert;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.HiveMapper;
+import org.apache.storm.hive.bolt.HiveSetupUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.HashMap;
+
+public class TestHiveWriter {
+ final static String dbName = "testdb";
+ final static String tblName = "test_table2";
+
+ public static final String PART1_NAME = "city";
+ public static final String PART2_NAME = "state";
+ public static final String[] partNames = { PART1_NAME, PART2_NAME };
+ final String[] partitionVals = {"sunnyvale","ca"};
+ final String[] colNames = {"id","msg"};
+ private String[] colTypes = { "int", "string" };
+ private final int port;
+ private final String metaStoreURI;
+ private final HiveConf conf;
+ private ExecutorService callTimeoutPool;
+ private final Driver driver;
+ int timeout = 10000; // msec
+ UserGroupInformation ugi = null;
+
+ @Rule
+ public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+ public TestHiveWriter() throws Exception {
+ port = 9083;
+ metaStoreURI = null;
+ int callTimeoutPoolSize = 1;
+ callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+
+ // 1) Start metastore
+ conf = HiveSetupUtil.getHiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ if(metaStoreURI!=null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+ }
+ SessionState.start(new CliSessionState(conf));
+ driver = new Driver(conf);
+ driver.init();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // 1) Setup tables
+ HiveSetupUtil.dropDB(conf, dbName);
+ String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+ HiveSetupUtil.createDbAndTable(conf, dbName, tblName, Arrays.asList(partitionVals),
+ colNames,colTypes, partNames, dbLocation);
+ }
+
+ @Test
+ public void testInstantiate() throws Exception {
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+ ,callTimeoutPool, mapper, ugi);
+ writer.close();
+ }
+
+ @Test
+ public void testWriteBasic() throws Exception {
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+ , callTimeoutPool, mapper, ugi);
+ writeTuples(writer,mapper,3);
+ writer.flush(false);
+ writer.close();
+ checkRecordCountInTable(dbName,tblName,3);
+ }
+
+ @Test
+ public void testWriteMultiFlush() throws Exception {
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+ , callTimeoutPool, mapper, ugi);
+ Tuple tuple = generateTestTuple("1","abc");
+ writer.write(mapper.mapRecord(tuple));
+ checkRecordCountInTable(dbName,tblName,0);
+ writer.flush(true);
+
+ tuple = generateTestTuple("2","def");
+ writer.write(mapper.mapRecord(tuple));
+ writer.flush(true);
+
+ tuple = generateTestTuple("3","ghi");
+ writer.write(mapper.mapRecord(tuple));
+ writer.flush(true);
+ writer.close();
+ checkRecordCountInTable(dbName,tblName,3);
+ }
+
+ private Tuple generateTestTuple(Object id, Object msg) {
+ TopologyBuilder builder = new TopologyBuilder();
+ GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+ new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+ @Override
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ return new Fields("id", "msg");
+ }
+ };
+ return new TupleImpl(topologyContext, new Values(id, msg), 1, "");
+ }
+
+ private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
+ throws HiveWriter.WriteFailure, InterruptedException {
+ Integer id = 100;
+ String msg = "test-123";
+ for (int i = 1; i <= count; i++) {
+ Tuple tuple = generateTestTuple(id,msg);
+ writer.write(mapper.mapRecord(tuple));
+ }
+ }
+
+ private void checkRecordCountInTable(String dbName,String tableName,int expectedCount)
+ throws CommandNeedRetryException, IOException {
+ int count = listRecordsInTable(dbName,tableName).size();
+ Assert.assertEquals(expectedCount, count);
+ }
+
+ private ArrayList<String> listRecordsInTable(String dbName,String tableName)
+ throws CommandNeedRetryException, IOException {
+ driver.compile("select * from " + dbName + "." + tableName);
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ return res;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
new file mode 100644
index 0000000..bc607f3
--- /dev/null
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.trident;
+
+
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.task.TopologyContext;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+
+
+public class TridentHiveTopology {
+ public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) {
+ int batchSize = 100;
+ FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+ spout.setCycle(true);
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("hiveTridentspout1",spout);
+ String[] partNames = {"city","state"};
+ String[] colNames = {"id","name","phone","street"};
+ Fields hiveFields = new Fields("id","name","phone","street","city","state");
+ DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+ .withColumnFields(new Fields(colNames))
+ .withPartitionFields(new Fields(partNames));
+ HiveOptions hiveOptions;
+ if (keytab != null && principal != null) {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(batchSize)
+ .withIdleTimeout(10)
+ .withCallTimeout(30000)
+ .withKerberosKeytab((String)keytab)
+ .withKerberosPrincipal((String)principal);
+ } else {
+ hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+ .withTxnsPerBatch(10)
+ .withBatchSize(batchSize)
+ .withCallTimeout(30000)
+ .withIdleTimeout(10);
+ }
+ StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+ TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+ return topology.build();
+ }
+
+ public static void waitForSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static void main(String[] args) {
+ String metaStoreURI = args[0];
+ String dbName = args[1];
+ String tblName = args[2];
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if(args.length == 3) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+ System.out.println("waiting for 60 seconds");
+ waitForSeconds(60);
+ System.out.println("killing topology");
+ cluster.killTopology("tridenHiveTopology");
+ System.out.println("cluster shutdown");
+ cluster.shutdown();
+ System.out.println("cluster shutdown");
+ System.exit(0);
+ } else if(args.length == 4) {
+ try {
+ StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+ } catch(Exception e) {
+ System.out.println("Failed to submit topology "+e);
+ }
+ } else if (args.length == 6) {
+ try {
+ StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
+ } catch(Exception e) {
+ System.out.println("Failed to submit topology "+e);
+ }
+ } else {
+ System.out.println("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
+ }
+ }
+
+ public static class FixedBatchSpout implements IBatchSpout {
+ int maxBatchSize;
+ HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+ private Values[] outputs = {
+ new Values("1","user1","123456","street1","sunnyvale","ca"),
+ new Values("2","user2","123456","street2","sunnyvale","ca"),
+ new Values("3","user3","123456","street3","san jose","ca"),
+ new Values("4","user4","123456","street4","san jose","ca"),
+ };
+ private int index = 0;
+ boolean cycle = false;
+
+ public FixedBatchSpout(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ public void setCycle(boolean cycle) {
+ this.cycle = cycle;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("id","name","phone","street","city","state");
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ index = 0;
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> batch = this.batches.get(batchId);
+ if(batch == null){
+ batch = new ArrayList<List<Object>>();
+ if(index>=outputs.length && cycle) {
+ index = 0;
+ }
+ for(int i=0; i < maxBatchSize; index++, i++) {
+ if(index == outputs.length){
+ index=0;
+ }
+ batch.add(outputs[index]);
+ }
+ this.batches.put(batchId, batch);
+ }
+ for(List<Object> list : batch){
+ collector.emit(list);
+ }
+ }
+
+ @Override
+ public void ack(long batchId) {
+ this.batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af86bd0..6265d9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@
<module>external/storm-kafka</module>
<module>external/storm-hdfs</module>
<module>external/storm-hbase</module>
+ <module>external/storm-hive</module>
</modules>
<scm>
@@ -212,7 +213,8 @@
<zookeeper.version>3.4.6</zookeeper.version>
<clojure-data-codec.version>0.1.0</clojure-data-codec.version>
<clojure-contrib.version>1.2.0</clojure-contrib.version>
-
+ <hive.version>0.13.0</hive.version>
+ <hadoop.version>2.6.0</hadoop.version>
</properties>
<profiles>
http://git-wip-us.apache.org/repos/asf/storm/blob/81772b22/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index ebff853..51cfd14 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -103,6 +103,20 @@
<include>README.*</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-hive/target</directory>
+ <outputDirectory>external/storm-hive</outputDirectory>
+ <includes>
+ <include>storm*jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-hive</directory>
+ <outputDirectory>external/storm-hive</outputDirectory>
+ <includes>
+ <include>README.*</include>
+ </includes>
+ </fileSet>
</fileSets>