You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:10:02 UTC
[16/20] storm git commit: [StormSQL] STORM-1222. Support Kafka as
external tables in StormSQL.
[StormSQL] STORM-1222. Support Kafka as external tables in StormSQL.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b60712f6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b60712f6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b60712f6
Branch: refs/heads/master
Commit: b60712f616be4579049d2ffd7e84f9047d20a8e3
Parents: a8894e6
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Nov 19 14:09:26 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800
----------------------------------------------------------------------
external/sql/pom.xml | 1 +
external/sql/storm-sql-kafka/pom.xml | 111 +++++++++++
.../org/apache/storm/sql/kafka/JsonScheme.java | 56 ++++++
.../apache/storm/sql/kafka/JsonSerializer.java | 58 ++++++
.../sql/kafka/KafkaDataSourcesProvider.java | 184 +++++++++++++++++++
...apache.storm.sql.runtime.DataSourcesProvider | 16 ++
.../storm/sql/kafka/TestJsonRepresentation.java | 50 +++++
.../sql/kafka/TestKafkaDataSourcesProvider.java | 103 +++++++++++
.../storm/sql/runtime/IOutputSerializer.java | 31 ++++
pom.xml | 6 +
10 files changed, 616 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index e4dae94..7884d39 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -39,5 +39,6 @@
<modules>
<module>storm-sql-core</module>
<module>storm-sql-runtime</module>
+ <module>storm-sql-kafka</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..0f6bd19
--- /dev/null
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-kafka</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.2.1</version>
+ <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.8.2.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
new file mode 100644
index 0000000..80037c6
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+ private final List<String> fields;
+
+ JsonScheme(List<String> fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> map = mapper.readValue(ser, HashMap.class);
+ ArrayList<Object> list = new ArrayList<>();
+ for (String f : fields) {
+ list.add(map.get(f));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fields);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
new file mode 100644
index 0000000..7c5aa57
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.lang.CharSet;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+class JsonSerializer implements IOutputSerializer {
+ private final List<String> fieldNames;
+ private transient final JsonFactory jsonFactory;
+
+ JsonSerializer(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ jsonFactory = new JsonFactory();
+ }
+
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+ StringWriter sw = new StringWriter();
+ try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+ jg.writeStartObject();
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ jg.writeFieldName(fieldNames.get(i));
+ jg.writeObject(data.get(i));
+ }
+ jg.writeEndObject();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ByteBuffer.wrap(sw.toString().getBytes(Charsets.UTF_8));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..e57e4d3
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.*;
+import storm.kafka.ZkHosts;
+import storm.kafka.trident.OpaqueTridentKafkaSpout;
+import storm.kafka.trident.TridentKafkaConfig;
+import storm.kafka.trident.TridentKafkaState;
+import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import storm.kafka.trident.selector.KafkaTopicSelector;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.operation.TridentOperationContext;
+import storm.trident.spout.ITridentDataSource;
+import storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Create a Kafka spout based on the URI. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+ private static final int DEFAULT_ZK_PORT = 2181;
+ private static class StaticTopicSelector implements KafkaTopicSelector {
+ private final String topic;
+
+ private StaticTopicSelector(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public String getTopic(TridentTuple tuple) {
+ return topic;
+ }
+ }
+
+ private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+ private final int primaryKeyIndex;
+ private final IOutputSerializer serializer;
+
+ private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Object getKeyFromTuple(TridentTuple tuple) {
+ return tuple.get(primaryKeyIndex);
+ }
+
+ @Override
+ public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+ return serializer.write(tuple.getValues(), null);
+ }
+ }
+
+ static class KafkaTridentSink extends BaseFunction {
+ private transient TridentKafkaState state;
+ private final String topic;
+ private final int primaryKeyIndex;
+ private final List<String> fieldNames;
+
+ private KafkaTridentSink(String topic, int primaryKeyIndex, List<String> fieldNames) {
+ this.topic = topic;
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ JsonSerializer serializer = new JsonSerializer(fieldNames);
+ SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
+ state = new TridentKafkaState()
+ .withKafkaTopicSelector(new StaticTopicSelector(topic))
+ .withTridentTupleToKafkaMapper(m);
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ state.updateState(Collections.singletonList(tuple), collector);
+ }
+ }
+
+ private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+ private final TridentKafkaConfig conf;
+ private final String topic;
+ private final int primaryKeyIndex;
+ private final List<String> fields;
+ private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+ List<String> fields) {
+ this.conf = conf;
+ this.topic = topic;
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.fields = fields;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ return new OpaqueTridentKafkaSpout(conf);
+ }
+
+ @Override
+ public Function getConsumer() {
+ return new KafkaTridentSink(topic, primaryKeyIndex, fields);
+ }
+ }
+
+ @Override
+ public String scheme() {
+ return "kafka";
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+ ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+ Map<String, String> values = parseURIParams(uri.getQuery());
+ String topic = values.get("topic");
+ Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+ TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+ List<String> fieldNames = new ArrayList<>();
+ int primaryIndex = -1;
+ for (int i = 0; i < fields.size(); ++i) {
+ FieldInfo f = fields.get(i);
+ fieldNames.add(f.name());
+ if (f.isPrimary()) {
+ primaryIndex = i;
+ }
+ }
+ Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+ conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
+ return new KafkaTridentDataSource(conf, topic, primaryIndex, fieldNames);
+ }
+
+ private static Map<String, String> parseURIParams(String query) {
+ HashMap<String, String> res = new HashMap<>();
+ if (query == null) {
+ return res;
+ }
+
+ String[] params = query.split("&");
+ for (String p : params) {
+ String[] v = p.split("=", 2);
+ if (v.length > 1) {
+ res.put(v[0], v[1]);
+ }
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
new file mode 100644
index 0000000..d2898e8
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.utils.Utils;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestJsonRepresentation {
+ @Test
+ public void testJsonScheme() {
+ final List<String> fields = Lists.newArrayList("ID", "val");
+ final String s = "{\"ID\": 1, \"val\": \"2\"}";
+ JsonScheme scheme = new JsonScheme(fields);
+ List<Object> o = scheme.deserialize(s.getBytes(Charset.defaultCharset()));
+ assertArrayEquals(new Object[] {1, "2"}, o.toArray());
+ }
+
+ @Test
+ public void testJsonSerializer() {
+ final List<String> fields = Lists.newArrayList("ID", "val");
+ List<Object> o = Lists.<Object> newArrayList(1, "2");
+ JsonSerializer s = new JsonSerializer(fields);
+ ByteBuffer buf = s.write(o, null);
+ byte[] b = Utils.toByteArray(buf);
+ assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..531f764
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+import storm.kafka.trident.TridentKafkaState;
+import storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+public class TestKafkaDataSourcesProvider {
+ private static final List<FieldInfo> FIELDS = ImmutableList.of(
+ new FieldInfo("ID", int.class, true),
+ new FieldInfo("val", String.class, false));
+ private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+ private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testKafkaSink() {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create("kafka://mock?topic=foo"), null, null, FIELDS);
+ Assert.assertNotNull(ds);
+ KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
+ sink.prepare(null, null);
+ TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
+ Producer producer = mock(Producer.class);
+ Whitebox.setInternalState(state, "producer", producer);
+ List<TridentTuple> tupleList = mockTupleList();
+ for (TridentTuple t : tupleList) {
+ state.updateState(Collections.singletonList(t), null);
+ verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+ }
+ verifyNoMoreInteractions(producer);
+ }
+
+ private static List<TridentTuple> mockTupleList() {
+ List<TridentTuple> tupleList = new ArrayList<>();
+ TridentTuple t0 = mock(TridentTuple.class);
+ TridentTuple t1 = mock(TridentTuple.class);
+ doReturn(1).when(t0).get(0);
+ doReturn(2).when(t1).get(0);
+ doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+ doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+ tupleList.add(t0);
+ tupleList.add(t1);
+ return tupleList;
+ }
+
+ private static class KafkaMessageMatcher extends ArgumentMatcher<KeyedMessage> {
+ private static final int PRIMARY_INDEX = 0;
+ private final TridentTuple tuple;
+
+ private KafkaMessageMatcher(TridentTuple tuple) {
+ this.tuple = tuple;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean matches(Object o) {
+ KeyedMessage<Object, ByteBuffer> m = (KeyedMessage<Object,ByteBuffer>)o;
+ if (m.key() != tuple.get(PRIMARY_INDEX)) {
+ return false;
+ }
+ ByteBuffer buf = m.message();
+ ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+ return b.equals(buf);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
new file mode 100644
index 0000000..b6670d9
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface IOutputSerializer {
+ /**
+ * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+ * memory.
+ *
+ * @return A ByteBuffer contains the serialized result.
+ */
+ ByteBuffer write(List<Object> data, ByteBuffer buffer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3387583..4c7388d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,6 +228,7 @@
<metrics-clojure.version>2.5.1</metrics-clojure.version>
<hdrhistogram.version>2.1.7</hdrhistogram.version>
<calcite.version>1.4.0-incubating</calcite.version>
+ <jackson.version>2.6.3</jackson.version>
</properties>
<modules>
@@ -694,6 +695,11 @@
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>