You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:06 UTC
[03/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
new file mode 100644
index 0000000..38c3fcb
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.hdfs.trident.HdfsState;
+import org.apache.storm.hdfs.trident.HdfsStateFactory;
+import org.apache.storm.hdfs.trident.HdfsUpdater;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a HDFS sink based on the URI and properties. The URI has the format of hdfs://host:port/path-to-file
+ * The properties are in JSON format which specifies the name / path of the hdfs file and etc.
+ */
+public class HdfsDataSourcesProvider implements DataSourcesProvider {
+
+ private static class HdfsTridentDataSource implements ISqlTridentDataSource {
+ private final String url;
+ private final Properties props;
+ private final IOutputSerializer serializer;
+
+ private HdfsTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
+ this.url = url;
+ this.props = props;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+ }
+
+ @Override
+ public SqlTridentConsumer getConsumer() {
+ FileNameFormat fileNameFormat = new SimpleFileNameFormat()
+ .withPath(props.getProperty("hdfs.file.path", "/storm"))
+ .withName(props.getProperty("hdfs.file.name", "$TIME.$NUM.txt"));
+
+ RecordFormat recordFormat = new TridentRecordFormat(serializer);
+
+ FileRotationPolicy rotationPolicy;
+ String size = props.getProperty("hdfs.rotation.size.kb");
+ String interval = props.getProperty("hdfs.rotation.time.seconds");
+ Preconditions.checkArgument(size != null || interval != null, "Hdfs data source must contain file rotation config");
+
+ if (size != null) {
+ rotationPolicy = new FileSizeRotationPolicy(Float.parseFloat(size), FileSizeRotationPolicy.Units.KB);
+ } else {
+ rotationPolicy = new TimedRotationPolicy(Float.parseFloat(interval), TimedRotationPolicy.TimeUnit.SECONDS);
+ }
+
+ HdfsState.Options options = new HdfsState.HdfsFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(recordFormat)
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl(url);
+
+ StateFactory stateFactory = new HdfsStateFactory().withOptions(options);
+ StateUpdater stateUpdater = new HdfsUpdater();
+
+ return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ }
+ }
+
+ private static class TridentRecordFormat implements RecordFormat {
+ private final IOutputSerializer serializer;
+
+ private TridentRecordFormat(IOutputSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public byte[] format(TridentTuple tuple) {
+ //TODO we should handle '\n'. ref DelimitedRecordFormat
+ return serializer.write(tuple.getValues(), null).array();
+ }
+
+ }
+
+ @Override
+ public String scheme() {
+ return "hdfs";
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+ IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+ return new HdfsTridentDataSource(uri.toString(), properties, serializer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..5fac84f
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.hdfs.HdfsDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
new file mode 100644
index 0000000..1473438
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.hdfs;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.storm.hdfs.trident.HdfsState;
+import org.apache.storm.hdfs.trident.HdfsStateFactory;
+import org.apache.storm.hdfs.trident.HdfsUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.storm.hdfs.trident.HdfsState.HdfsFileOptions;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class TestHdfsDataSourcesProvider {
+ private static final List<FieldInfo> FIELDS = ImmutableList.of(
+ new FieldInfo("ID", int.class, true),
+ new FieldInfo("val", String.class, false));
+ private static final Properties TBL_PROPERTIES = new Properties();
+
+ private static String hdfsURI;
+ private static MiniDFSCluster hdfsCluster;
+
+ static {
+ TBL_PROPERTIES.put("hdfs.file.path", "/unittest");
+ TBL_PROPERTIES.put("hdfs.file.name", "test1.txt");
+ TBL_PROPERTIES.put("hdfs.rotation.time.seconds", "120");
+ }
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("fs.trash.interval", "10");
+ conf.setBoolean("dfs.permissions", true);
+ File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+ hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+ }
+
+ @After
+ public void shutDown() throws IOException {
+ hdfsCluster.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testHdfsSink() {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
+ Assert.assertNotNull(ds);
+
+ ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+ Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass());
+ Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass());
+
+ HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+ StateUpdater stateUpdater = consumer.getStateUpdater();
+
+ HdfsFileOptions options = mock(HdfsFileOptions.class);
+ Whitebox.setInternalState(state, "options", options);
+
+ List<TridentTuple> tupleList = mockTupleList();
+
+ for (TridentTuple t : tupleList) {
+ stateUpdater.updateState(state, Collections.singletonList(t), null);
+ try {
+ verify(options).execute(Collections.singletonList(t));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static List<TridentTuple> mockTupleList() {
+ List<TridentTuple> tupleList = new ArrayList<>();
+ TridentTuple t0 = mock(TridentTuple.class);
+ TridentTuple t1 = mock(TridentTuple.class);
+ doReturn(1).when(t0).get(0);
+ doReturn(2).when(t1).get(0);
+ doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+ doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+ tupleList.add(t0);
+ tupleList.add(t1);
+ return tupleList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/pom.xml b/sql/storm-sql-external/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..881343d
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-kafka</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${storm.kafka.artifact.id}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${storm.kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..2aa98ba
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
+ * of the Kafka broker.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+ private static final int DEFAULT_ZK_PORT = 2181;
+
+ private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+ private final int primaryKeyIndex;
+ private final IOutputSerializer serializer;
+
+ private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Object getKeyFromTuple(TridentTuple tuple) {
+ return tuple.get(primaryKeyIndex);
+ }
+
+ @Override
+ public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+ return serializer.write(tuple.getValues(), null);
+ }
+ }
+
+ private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+ private final TridentKafkaConfig conf;
+ private final String topic;
+ private final int primaryKeyIndex;
+ private final Properties props;
+ private final IOutputSerializer serializer;
+ private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+ Properties props, IOutputSerializer serializer) {
+ this.conf = conf;
+ this.topic = topic;
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.props = props;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ return new OpaqueTridentKafkaSpout(conf);
+ }
+
+ @Override
+ public SqlTridentConsumer getConsumer() {
+ Preconditions.checkArgument(!props.isEmpty(),
+ "Writable Kafka Table " + topic + " must contain producer config");
+ HashMap<String, Object> producerConfig = (HashMap<String, Object>) props.get("producer");
+ props.putAll(producerConfig);
+ Preconditions.checkState(props.containsKey("bootstrap.servers"),
+ "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
+
+ SqlKafkaMapper mapper = new SqlKafkaMapper(primaryKeyIndex, serializer);
+
+ TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+ .withKafkaTopicSelector(new DefaultTopicSelector(topic))
+ .withProducerProperties(props)
+ .withTridentTupleToKafkaMapper(mapper);
+
+ TridentKafkaUpdater stateUpdater = new TridentKafkaUpdater();
+
+ return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ }
+ }
+
+ @Override
+ public String scheme() {
+ return "kafka";
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+ ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+ Map<String, String> values = parseURIParams(uri.getQuery());
+ String topic = values.get("topic");
+ Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+ TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+ List<String> fieldNames = new ArrayList<>();
+ int primaryIndex = -1;
+ for (int i = 0; i < fields.size(); ++i) {
+ FieldInfo f = fields.get(i);
+ fieldNames.add(f.name());
+ if (f.isPrimary()) {
+ primaryIndex = i;
+ }
+ }
+ Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+ Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
+ conf.scheme = new SchemeAsMultiScheme(scheme);
+ IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+
+ return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, serializer);
+ }
+
+ private static Map<String, String> parseURIParams(String query) {
+ HashMap<String, String> res = new HashMap<>();
+ if (query == null) {
+ return res;
+ }
+
+ String[] params = query.split("&");
+ for (String p : params) {
+ String[] v = p.split("=", 2);
+ if (v.length > 1) {
+ res.put(v[0], v[1]);
+ }
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..0cde492
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class TestKafkaDataSourcesProvider {
+ private static final List<FieldInfo> FIELDS = ImmutableList.of(
+ new FieldInfo("ID", int.class, true),
+ new FieldInfo("val", String.class, false));
+ private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+ private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+ private static final Properties TBL_PROPERTIES = new Properties();
+
+ static {
+ Map<String,Object> map = new HashMap<>();
+ map.put("bootstrap.servers", "localhost:9092");
+ map.put("acks", "1");
+ map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ TBL_PROPERTIES.put("producer", map);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testKafkaSink() {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
+ Assert.assertNotNull(ds);
+
+ ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+ Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass());
+ Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass());
+
+ TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+ KafkaProducer producer = mock(KafkaProducer.class);
+ doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
+ Whitebox.setInternalState(state, "producer", producer);
+
+ List<TridentTuple> tupleList = mockTupleList();
+ for (TridentTuple t : tupleList) {
+ state.updateState(Collections.singletonList(t), null);
+ verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+ }
+ verifyNoMoreInteractions(producer);
+ }
+
+ private static List<TridentTuple> mockTupleList() {
+ List<TridentTuple> tupleList = new ArrayList<>();
+ TridentTuple t0 = mock(TridentTuple.class);
+ TridentTuple t1 = mock(TridentTuple.class);
+ doReturn(1).when(t0).get(0);
+ doReturn(2).when(t1).get(0);
+ doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+ doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+ tupleList.add(t0);
+ tupleList.add(t1);
+ return tupleList;
+ }
+
+ private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
+ private static final int PRIMARY_INDEX = 0;
+ private final TridentTuple tuple;
+
+ private KafkaMessageMatcher(TridentTuple tuple) {
+ this.tuple = tuple;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean matches(Object o) {
+ ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
+ if (m.key() != tuple.get(PRIMARY_INDEX)) {
+ return false;
+ }
+ ByteBuffer buf = m.value();
+ ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+ return b.equals(buf);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/sql/storm-sql-external/storm-sql-mongodb/pom.xml
new file mode 100644
index 0000000..218d89b
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-mongodb</artifactId>
+
+ <developers>
+ <developer>
+ <id>vesense</id>
+ <name>Xin Wang</name>
+ <email>data.xinwang@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-mongodb</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
new file mode 100644
index 0000000..60d52d1
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.mongodb;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a MongoDB sink based on the URI and properties. The URI has the format of
+ * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
+ * The properties are in JSON format which specifies the name of the MongoDB collection and etc.
+ */
+public class MongoDataSourcesProvider implements DataSourcesProvider {
+
+ private static class MongoTridentDataSource implements ISqlTridentDataSource {
+ private final String url;
+ private final Properties props;
+ private final IOutputSerializer serializer;
+
+ private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
+ this.url = url;
+ this.props = props;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+ }
+
+ @Override
+ public SqlTridentConsumer getConsumer() {
+ Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
+ String serField = props.getProperty("trident.ser.field", "tridentSerField");
+ MongoMapper mapper = new TridentMongoMapper(serField, serializer);
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(props.getProperty("collection.name"))
+ .withMapper(mapper);
+
+ StateFactory stateFactory = new MongoStateFactory(options);
+ StateUpdater stateUpdater = new MongoStateUpdater();
+
+ return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ }
+ }
+
+ private static class TridentMongoMapper implements MongoMapper {
+ private final String serField;
+ private final IOutputSerializer serializer;
+
+ private TridentMongoMapper(String serField, IOutputSerializer serializer) {
+ this.serField = serField;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ byte[] array = serializer.write(tuple.getValues(), null).array();
+ document.append(serField, array);
+ return document;
+ }
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ return null;
+ }
+ }
+
+ @Override
+ public String scheme() {
+ return "mongodb";
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+ IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+ return new MongoTridentDataSource(uri.toString(), properties, serializer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..e46d794
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.mongodb.MongoDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
new file mode 100644
index 0000000..3b15345
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.mongodb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class TestMongoDataSourcesProvider {
+ private static final List<FieldInfo> FIELDS = ImmutableList.of(
+ new FieldInfo("ID", int.class, true),
+ new FieldInfo("val", String.class, false));
+ private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+ private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+ private static final Properties TBL_PROPERTIES = new Properties();
+
+ static {
+ TBL_PROPERTIES.put("collection.name", "collection1");
+ TBL_PROPERTIES.put("trident.ser.field", "tridentSerField");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMongoSink() {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create("mongodb://127.0.0.1:27017/test"), null, null, TBL_PROPERTIES, FIELDS);
+ Assert.assertNotNull(ds);
+
+ ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+ Assert.assertEquals(MongoStateFactory.class, consumer.getStateFactory().getClass());
+ Assert.assertEquals(MongoStateUpdater.class, consumer.getStateUpdater().getClass());
+
+ MongoState state = (MongoState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+ StateUpdater stateUpdater = consumer.getStateUpdater();
+
+ MongoDBClient mongoClient = mock(MongoDBClient.class);
+ Whitebox.setInternalState(state, "mongoClient", mongoClient);
+
+ List<TridentTuple> tupleList = mockTupleList();
+
+ for (TridentTuple t : tupleList) {
+ stateUpdater.updateState(state, Collections.singletonList(t), null);
+ verify(mongoClient).insert(argThat(new MongoArgMatcher(t)) , eq(true));
+ }
+
+ verifyNoMoreInteractions(mongoClient);
+ }
+
+ private static List<TridentTuple> mockTupleList() {
+ List<TridentTuple> tupleList = new ArrayList<>();
+ TridentTuple t0 = mock(TridentTuple.class);
+ TridentTuple t1 = mock(TridentTuple.class);
+ doReturn(1).when(t0).get(0);
+ doReturn(2).when(t1).get(0);
+ doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+ doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+ tupleList.add(t0);
+ tupleList.add(t1);
+ return tupleList;
+ }
+
+ private static class MongoArgMatcher extends ArgumentMatcher<List<Document>> {
+ private final TridentTuple tuple;
+
+ private MongoArgMatcher(TridentTuple tuple) {
+ this.tuple = tuple;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean matches(Object o) {
+ Document doc = ((List<Document>)o).get(0);
+ ByteBuffer buf = ByteBuffer.wrap((byte[])doc.get(TBL_PROPERTIES.getProperty("trident.ser.field")));
+ ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+ return b.equals(buf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/pom.xml b/sql/storm-sql-external/storm-sql-redis/pom.xml
new file mode 100644
index 0000000..32ce432
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-redis</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-redis</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
new file mode 100644
index 0000000..68933b2
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.redis;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.tuple.ITuple;
+import redis.clients.util.JedisURIHelper;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a Redis sink based on the URI and properties. The URI has the format of
+ * redis://:[password]@[host]:[port]/[dbIdx]. Only host is mandatory and others can be set to default.
+ *
+ * The properties are in JSON format which specifies the config of the Redis data type and etc.
+ * Please note that when "use.redis.cluster" is "true", cluster discovery is only done from given URI.
+ */
+public class RedisDataSourcesProvider implements DataSourcesProvider {
+ private static final int DEFAULT_REDIS_PORT = 6379;
+ private static final int DEFAULT_TIMEOUT = 2000;
+
+ private abstract static class AbstractRedisTridentDataSource implements ISqlTridentDataSource, Serializable {
+ protected abstract StateFactory newStateFactory();
+ protected abstract StateUpdater newStateUpdater(RedisStoreMapper storeMapper);
+
+ private final Properties props;
+ private final List<FieldInfo> fields;
+ private final IOutputSerializer serializer;
+
+ AbstractRedisTridentDataSource(Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
+ this.props = props;
+ this.fields = fields;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+ }
+
+ @Override
+ public SqlTridentConsumer getConsumer() {
+ RedisDataTypeDescription dataTypeDescription = getDataTypeDesc(props);
+
+ RedisStoreMapper storeMapper = new TridentRedisStoreMapper(dataTypeDescription, fields, serializer);
+
+ StateFactory stateFactory = newStateFactory();
+ StateUpdater stateUpdater = newStateUpdater(storeMapper);
+
+ return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ }
+
+ private RedisDataTypeDescription getDataTypeDesc(Properties props) {
+ Preconditions.checkArgument(props.containsKey("data.type"),
+ "Redis data source must contain \"data.type\" config");
+
+ RedisDataTypeDescription.RedisDataType dataType = RedisDataTypeDescription.RedisDataType.valueOf(props.getProperty("data.type").toUpperCase());
+ String additionalKey = props.getProperty("data.additional.key");
+
+ return new RedisDataTypeDescription(dataType, additionalKey);
+ }
+ }
+
+ private static class RedisClusterTridentDataSource extends AbstractRedisTridentDataSource {
+ private final JedisClusterConfig config;
+
+ RedisClusterTridentDataSource(JedisClusterConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
+ super(props, fields, serializer);
+ this.config = config;
+ }
+
+ @Override
+ protected StateFactory newStateFactory() {
+ return new RedisClusterState.Factory(config);
+ }
+
+ @Override
+ protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
+ return new RedisClusterStateUpdater(storeMapper);
+ }
+ }
+
+ private static class RedisTridentDataSource extends AbstractRedisTridentDataSource {
+ private final JedisPoolConfig config;
+
+ RedisTridentDataSource(JedisPoolConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
+ super(props, fields, serializer);
+ this.config = config;
+ }
+
+ @Override
+ protected StateFactory newStateFactory() {
+ return new RedisState.Factory(config);
+ }
+
+ @Override
+ protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
+ return new RedisStateUpdater(storeMapper);
+ }
+ }
+
+ @Override
+ public String scheme() {
+ return "redis";
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties props, List<FieldInfo> fields) {
+ Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis: " + uri);
+
+ String host = uri.getHost();
+ int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_REDIS_PORT;
+ int dbIdx = JedisURIHelper.getDBIndex(uri);
+ String password = JedisURIHelper.getPassword(uri);
+
+ int timeout = Integer.parseInt(props.getProperty("redis.timeout", String.valueOf(DEFAULT_TIMEOUT)));
+
+ boolean clusterMode = Boolean.valueOf(props.getProperty("use.redis.cluster", "false"));
+
+ List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+ IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, props, fieldNames);
+ if (clusterMode) {
+ JedisClusterConfig config = new JedisClusterConfig.Builder()
+ .setNodes(Collections.singleton(new InetSocketAddress(host, port)))
+ .setTimeout(timeout)
+ .build();
+ return new RedisClusterTridentDataSource(config, props, fields, serializer);
+ } else {
+ JedisPoolConfig config = new JedisPoolConfig(host, port, timeout, password, dbIdx);
+ return new RedisTridentDataSource(config, props, fields, serializer);
+ }
+ }
+
+ private static class TridentRedisStoreMapper implements RedisStoreMapper {
+ private final RedisDataTypeDescription dataTypeDescription;
+ private final FieldInfo primaryKeyField;
+ private final IOutputSerializer outputSerializer;
+
+ private TridentRedisStoreMapper(RedisDataTypeDescription dataTypeDescription, List<FieldInfo> fields, IOutputSerializer outputSerializer) {
+ this.dataTypeDescription = dataTypeDescription;
+ this.outputSerializer = outputSerializer;
+
+ // find primary key from constructor
+ FieldInfo pkField = findPrimaryKeyField(fields);
+ Preconditions.checkArgument(pkField != null, "Primary key must be presented to field list");
+
+ this.primaryKeyField = pkField;
+ }
+
+ private FieldInfo findPrimaryKeyField(List<FieldInfo> fields) {
+ FieldInfo pkField = null;
+ for (FieldInfo field : fields) {
+ if (field.isPrimary()) {
+ // TODO: this assumes key is only from the one field
+ // if not we need to have order of fields in PK
+ pkField = field;
+ break;
+ }
+ }
+ return pkField;
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return dataTypeDescription;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ String keyFieldName = primaryKeyField.name();
+ Object key = tuple.getValueByField(keyFieldName);
+ if (key == null) {
+ throw new NullPointerException("key field " + keyFieldName + " is null");
+ }
+ return String.valueOf(key);
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ byte[] array = outputSerializer.write(tuple.getValues(), null).array();
+ return new String(array);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..23b0444
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.redis.RedisDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
new file mode 100644
index 0000000..94d4949
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.redis;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestRedisDataSourcesProvider {
+ private static final List<FieldInfo> FIELDS = ImmutableList.of(
+ new FieldInfo("ID", int.class, true),
+ new FieldInfo("val", String.class, false));
+ private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+ private static final String ADDITIONAL_KEY = "hello";
+ private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+ private static final Properties TBL_PROPERTIES = new Properties();
+ private static final Properties CLUSTER_TBL_PROPERTIES = new Properties();
+
+ static {
+ TBL_PROPERTIES.put("data.type", "HASH");
+ TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
+ CLUSTER_TBL_PROPERTIES.put("data.type", "HASH");
+ CLUSTER_TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
+ CLUSTER_TBL_PROPERTIES.put("use.redis.cluster", "true");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRedisSink() {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create("redis://:foobared@localhost:6380/2"), null, null, TBL_PROPERTIES, FIELDS);
+ Assert.assertNotNull(ds);
+
+ ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+ Assert.assertEquals(RedisState.Factory.class, consumer.getStateFactory().getClass());
+ Assert.assertEquals(RedisStateUpdater.class, consumer.getStateUpdater().getClass());
+
+ RedisState state = (RedisState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+ StateUpdater stateUpdater = consumer.getStateUpdater();
+
+ JedisPool mockJedisPool = mock(JedisPool.class);
+ Jedis mockJedis = mock(Jedis.class);
+ Pipeline mockPipeline = mock(Pipeline.class);
+
+ Whitebox.setInternalState(state, "jedisPool", mockJedisPool);
+ when(mockJedisPool.getResource()).thenReturn(mockJedis);
+ when(mockJedis.pipelined()).thenReturn(mockPipeline);
+
+ List<TridentTuple> tupleList = mockTupleList();
+
+ stateUpdater.updateState(state, tupleList, null);
+ for (TridentTuple t : tupleList) {
+ // PK goes to the key
+ String id = String.valueOf(t.getValueByField("ID"));
+ String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
+ verify(mockPipeline).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
+ }
+
+ verify(mockPipeline).sync();
+ verify(mockJedis).close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRedisClusterSink() throws IOException {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create("redis://localhost:6380"), null, null, CLUSTER_TBL_PROPERTIES, FIELDS);
+ Assert.assertNotNull(ds);
+
+ ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+ Assert.assertEquals(RedisClusterState.Factory.class, consumer.getStateFactory().getClass());
+ Assert.assertEquals(RedisClusterStateUpdater.class, consumer.getStateUpdater().getClass());
+
+ RedisClusterState state = (RedisClusterState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+ StateUpdater stateUpdater = consumer.getStateUpdater();
+
+ JedisCluster mockJedisCluster = mock(JedisCluster.class);
+
+ Whitebox.setInternalState(state, "jedisCluster", mockJedisCluster);
+
+ List<TridentTuple> tupleList = mockTupleList();
+
+ stateUpdater.updateState(state, tupleList, null);
+ for (TridentTuple t : tupleList) {
+ // PK goes to the key
+ String id = String.valueOf(t.getValueByField("ID"));
+ String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
+ verify(mockJedisCluster).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
+ }
+
+ verify(mockJedisCluster, never()).close();
+ }
+
+ private static List<TridentTuple> mockTupleList() {
+ List<TridentTuple> tupleList = new ArrayList<>();
+ TridentTuple t0 = mock(TridentTuple.class);
+ TridentTuple t1 = mock(TridentTuple.class);
+ when(t0.getValueByField("ID")).thenReturn(1);
+ when(t0.getValueByField("val")).thenReturn("2");
+ doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+
+ when(t1.getValueByField("ID")).thenReturn(2);
+ when(t1.getValueByField("val")).thenReturn("3");
+ doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+
+ tupleList.add(t0);
+ tupleList.add(t1);
+ return tupleList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/pom.xml b/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..ce57cb2
--- /dev/null
+++ b/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-runtime</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ <version>1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.9</version>
+ <executions>
+ <execution>
+ <id>create-repo</id>
+ <goals>
+ <goal>create-repository</goal>
+ </goals>
+ <configuration>
+ <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
+ <repositoryLayout>flat</repositoryLayout>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
new file mode 100644
index 0000000..aa7e435
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+
+import java.io.Serializable;
+
+/**
+ * This is a hack to use Calcite Context.
+ */
+public class StormContext extends Context implements Serializable {
+ public StormContext(DataContext root) {
+ super(root);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
new file mode 100644
index 0000000..64be39d
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+ @Override
+ public abstract void dataReceived(ChannelContext ctx, Values data);
+
+ @Override
+ public void channelInactive(ChannelContext ctx) {
+
+ }
+
+ @Override
+ public void exceptionCaught(Throwable cause) {
+
+ }
+
+ @Override
+ public void flush(ChannelContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void setSource(ChannelContext ctx, Object source) {
+
+ }
+
+ public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+ @Override
+ public void dataReceived(ChannelContext ctx, Values data) {
+ ctx.emit(data);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..6a853be
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+ /**
+ * Initialize the data sources.
+ *
+ * @param data a map from the table name to the iterators of the values.
+ *
+ */
+ public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+ result);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
new file mode 100644
index 0000000..65ad01c
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+public interface ChannelContext {
+ /**
+ * Emit data to the next stage of the data pipeline.
+ */
+ void emit(Values data);
+ void fireChannelInactive();
+ void flush();
+ void setSource(Object source);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
new file mode 100644
index 0000000..af02b7e
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+ void dataReceived(ChannelContext ctx, Values data);
+
+ /**
+ * The producer of the data has indicated that the channel is no longer
+ * active.
+ * @param ctx
+ */
+ void channelInactive(ChannelContext ctx);
+
+ void exceptionCaught(Throwable cause);
+
+ void flush(ChannelContext ctx);
+
+ void setSource(ChannelContext ctx, Object source);
+}