You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/06 04:58:42 UTC
[1/3] storm git commit: STORM-2081:create external directory for
storm-sql various data sources and move storm-sql-kafka to it
Repository: storm
Updated Branches:
refs/heads/1.x-branch b0b12281c -> 064c63a6b
STORM-2081:create external directory for storm-sql various data sources and move storm-sql-kafka to it
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/237ff9b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/237ff9b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/237ff9b4
Branch: refs/heads/1.x-branch
Commit: 237ff9b4215b85e27b490c3cd656c7ed716c1570
Parents: b0b1228
Author: vesense <be...@163.com>
Authored: Mon Sep 5 11:01:50 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 6 13:11:34 2016 +0900
----------------------------------------------------------------------
external/sql/pom.xml | 2 +-
.../storm-sql-external/storm-sql-kafka/pom.xml | 97 +++++++++
.../org/apache/storm/sql/kafka/JsonScheme.java | 58 +++++
.../apache/storm/sql/kafka/JsonSerializer.java | 56 +++++
.../sql/kafka/KafkaDataSourcesProvider.java | 209 +++++++++++++++++++
...apache.storm.sql.runtime.DataSourcesProvider | 16 ++
.../storm/sql/kafka/TestJsonRepresentation.java | 50 +++++
.../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ++++++++++
external/sql/storm-sql-kafka/pom.xml | 97 ---------
.../org/apache/storm/sql/kafka/JsonScheme.java | 58 -----
.../apache/storm/sql/kafka/JsonSerializer.java | 56 -----
.../sql/kafka/KafkaDataSourcesProvider.java | 209 -------------------
...apache.storm.sql.runtime.DataSourcesProvider | 16 --
.../storm/sql/kafka/TestJsonRepresentation.java | 50 -----
.../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ----------
storm-dist/binary/src/main/assembly/binary.xml | 4 +-
16 files changed, 603 insertions(+), 603 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 96bb012..5aa23a2 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -39,6 +39,6 @@
<modules>
<module>storm-sql-core</module>
<module>storm-sql-runtime</module>
- <module>storm-sql-kafka</module>
+ <module>storm-sql-external/storm-sql-kafka</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..021409b
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-kafka</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${storm.kafka.artifact.id}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${storm.kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
new file mode 100644
index 0000000..eed1282
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+ private final List<String> fields;
+
+ JsonScheme(List<String> fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+ ArrayList<Object> list = new ArrayList<>();
+ for (String f : fields) {
+ list.add(map.get(f));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
new file mode 100644
index 0000000..e3d5d01
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+class JsonSerializer implements IOutputSerializer {
+ private final List<String> fieldNames;
+ private transient final JsonFactory jsonFactory;
+
+ JsonSerializer(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ jsonFactory = new JsonFactory();
+ }
+
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+ StringWriter sw = new StringWriter();
+ try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+ jg.writeStartObject();
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ jg.writeFieldName(fieldNames.get(i));
+ jg.writeObject(data.get(i));
+ }
+ jg.writeEndObject();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..54e160f
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.*;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
+ * of the Kafka broker.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+ private static final int DEFAULT_ZK_PORT = 2181;
+ private static class StaticTopicSelector implements KafkaTopicSelector {
+ private final String topic;
+
+ private StaticTopicSelector(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public String getTopic(TridentTuple tuple) {
+ return topic;
+ }
+ }
+
+ private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+ private final int primaryKeyIndex;
+ private final IOutputSerializer serializer;
+
+ private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Object getKeyFromTuple(TridentTuple tuple) {
+ return tuple.get(primaryKeyIndex);
+ }
+
+ @Override
+ public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+ return serializer.write(tuple.getValues(), null);
+ }
+ }
+
+ static class KafkaTridentSink extends BaseFunction {
+ private transient TridentKafkaState state;
+ private final String topic;
+ private final int primaryKeyIndex;
+ private final Properties producerProperties;
+ private final List<String> fieldNames;
+
+ private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties,
+ List<String> fieldNames) {
+ this.topic = topic;
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.producerProperties = producerProperties;
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ JsonSerializer serializer = new JsonSerializer(fieldNames);
+ SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
+ state = new TridentKafkaState()
+ .withKafkaTopicSelector(new StaticTopicSelector(topic))
+ .withTridentTupleToKafkaMapper(m);
+ state.prepare(producerProperties);
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ state.updateState(Collections.singletonList(tuple), collector);
+ }
+ }
+
+ private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+ private final TridentKafkaConfig conf;
+ private final String topic;
+ private final int primaryKeyIndex;
+ private final List<String> fields;
+ private final String producerProperties;
+ private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+ String producerProperties, List<String> fields) {
+ this.conf = conf;
+ this.topic = topic;
+ this.primaryKeyIndex = primaryKeyIndex;
+ this.producerProperties = producerProperties;
+ this.fields = fields;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ return new OpaqueTridentKafkaSpout(conf);
+ }
+
+ @Override
+ public Function getConsumer() {
+ Preconditions.checkNotNull(producerProperties,
+ "Writable Kafka Table " + topic + " must contain producer config");
+ Properties props = new Properties();
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> map = mapper.readValue(producerProperties, HashMap.class);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
+ props.putAll(producerConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Preconditions.checkState(props.containsKey("bootstrap.servers"),
+ "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
+ return new KafkaTridentSink(topic, primaryKeyIndex, props, fields);
+ }
+ }
+
+ @Override
+ public String scheme() {
+ return "kafka";
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ String properties, List<FieldInfo> fields) {
+ int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+ ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+ Map<String, String> values = parseURIParams(uri.getQuery());
+ String topic = values.get("topic");
+ Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+ TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+ List<String> fieldNames = new ArrayList<>();
+ int primaryIndex = -1;
+ for (int i = 0; i < fields.size(); ++i) {
+ FieldInfo f = fields.get(i);
+ fieldNames.add(f.name());
+ if (f.isPrimary()) {
+ primaryIndex = i;
+ }
+ }
+ Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+ conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
+
+ return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, fieldNames);
+ }
+
+ private static Map<String, String> parseURIParams(String query) {
+ HashMap<String, String> res = new HashMap<>();
+ if (query == null) {
+ return res;
+ }
+
+ String[] params = query.split("&");
+ for (String p : params) {
+ String[] v = p.split("=", 2);
+ if (v.length > 1) {
+ res.put(v[0], v[1]);
+ }
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
new file mode 100644
index 0000000..7e85410
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import org.apache.storm.utils.Utils;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestJsonRepresentation {
+ @Test
+ public void testJsonScheme() {
+ final List<String> fields = Lists.newArrayList("ID", "val");
+ final String s = "{\"ID\": 1, \"val\": \"2\"}";
+ JsonScheme scheme = new JsonScheme(fields);
+ List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
+ assertArrayEquals(new Object[] {1, "2"}, o.toArray());
+ }
+
+ @Test
+ public void testJsonSerializer() {
+ final List<String> fields = Lists.newArrayList("ID", "val");
+ List<Object> o = Lists.<Object> newArrayList(1, "2");
+ JsonSerializer s = new JsonSerializer(fields);
+ ByteBuffer buf = s.write(o, null);
+ byte[] b = Utils.toByteArray(buf);
+ assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..f6e75ab
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.*;
+
+public class TestKafkaDataSourcesProvider {
+ private static final List<FieldInfo> FIELDS = ImmutableList.of(
+ new FieldInfo("ID", int.class, true),
+ new FieldInfo("val", String.class, false));
+ private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+ private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+ private static final String TBL_PROPERTIES = Joiner.on('\n').join(
+ "{\"producer\": {",
+ "\"bootstrap.servers\": \"localhost:9092\",",
+ "\"acks\": \"1\",",
+ "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
+ "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
+ "}",
+ "}"
+ );
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testKafkaSink() {
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+ URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
+ Assert.assertNotNull(ds);
+ KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
+ sink.prepare(null, null);
+ TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
+ KafkaProducer producer = mock(KafkaProducer.class);
+ doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
+ Whitebox.setInternalState(state, "producer", producer);
+ List<TridentTuple> tupleList = mockTupleList();
+ for (TridentTuple t : tupleList) {
+ state.updateState(Collections.singletonList(t), null);
+ verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+ }
+ verifyNoMoreInteractions(producer);
+ }
+
+ private static List<TridentTuple> mockTupleList() {
+ List<TridentTuple> tupleList = new ArrayList<>();
+ TridentTuple t0 = mock(TridentTuple.class);
+ TridentTuple t1 = mock(TridentTuple.class);
+ doReturn(1).when(t0).get(0);
+ doReturn(2).when(t1).get(0);
+ doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+ doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+ tupleList.add(t0);
+ tupleList.add(t1);
+ return tupleList;
+ }
+
+ private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
+ private static final int PRIMARY_INDEX = 0;
+ private final TridentTuple tuple;
+
+ private KafkaMessageMatcher(TridentTuple tuple) {
+ this.tuple = tuple;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean matches(Object o) {
+ ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
+ if (m.key() != tuple.get(PRIMARY_INDEX)) {
+ return false;
+ }
+ ByteBuffer buf = m.value();
+ ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+ return b.equals(buf);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
deleted file mode 100644
index d2796e1..0000000
--- a/external/sql/storm-sql-kafka/pom.xml
+++ /dev/null
@@ -1,97 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>1.1.0-SNAPSHOT</version>
- <relativePath>../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-kafka</artifactId>
-
- <developers>
- <developer>
- <id>haohui</id>
- <name>Haohui Mai</name>
- <email>ricetons@gmail.com</email>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>${storm.kafka.artifact.id}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${storm.kafka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/resources</directory>
- </resource>
- </resources>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
deleted file mode 100644
index eed1282..0000000
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class JsonScheme implements Scheme {
- private final List<String> fields;
-
- JsonScheme(List<String> fields) {
- this.fields = fields;
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- @SuppressWarnings("unchecked")
- HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
- ArrayList<Object> list = new ArrayList<>();
- for (String f : fields) {
- list.add(map.get(f));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(fields);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
deleted file mode 100644
index e3d5d01..0000000
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-class JsonSerializer implements IOutputSerializer {
- private final List<String> fieldNames;
- private transient final JsonFactory jsonFactory;
-
- JsonSerializer(List<String> fieldNames) {
- this.fieldNames = fieldNames;
- jsonFactory = new JsonFactory();
- }
-
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
- StringWriter sw = new StringWriter();
- try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
- jg.writeStartObject();
- for (int i = 0; i < fieldNames.size(); ++i) {
- jg.writeFieldName(fieldNames.get(i));
- jg.writeObject(data.get(i));
- }
- jg.writeEndObject();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
deleted file mode 100644
index 54e160f..0000000
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import org.apache.storm.spout.SchemeAsMultiScheme;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.*;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.kafka.trident.TridentKafkaConfig;
-import org.apache.storm.kafka.trident.TridentKafkaState;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-/**
- * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
- * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
- * of the Kafka broker.
- */
-public class KafkaDataSourcesProvider implements DataSourcesProvider {
- private static final int DEFAULT_ZK_PORT = 2181;
- private static class StaticTopicSelector implements KafkaTopicSelector {
- private final String topic;
-
- private StaticTopicSelector(String topic) {
- this.topic = topic;
- }
-
- @Override
- public String getTopic(TridentTuple tuple) {
- return topic;
- }
- }
-
- private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
- private final int primaryKeyIndex;
- private final IOutputSerializer serializer;
-
- private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
- this.primaryKeyIndex = primaryKeyIndex;
- this.serializer = serializer;
- }
-
- @Override
- public Object getKeyFromTuple(TridentTuple tuple) {
- return tuple.get(primaryKeyIndex);
- }
-
- @Override
- public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
- return serializer.write(tuple.getValues(), null);
- }
- }
-
- static class KafkaTridentSink extends BaseFunction {
- private transient TridentKafkaState state;
- private final String topic;
- private final int primaryKeyIndex;
- private final Properties producerProperties;
- private final List<String> fieldNames;
-
- private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties,
- List<String> fieldNames) {
- this.topic = topic;
- this.primaryKeyIndex = primaryKeyIndex;
- this.producerProperties = producerProperties;
- this.fieldNames = fieldNames;
- }
-
- @Override
- public void cleanup() {
- super.cleanup();
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- JsonSerializer serializer = new JsonSerializer(fieldNames);
- SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
- state = new TridentKafkaState()
- .withKafkaTopicSelector(new StaticTopicSelector(topic))
- .withTridentTupleToKafkaMapper(m);
- state.prepare(producerProperties);
- }
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- state.updateState(Collections.singletonList(tuple), collector);
- }
- }
-
- private static class KafkaTridentDataSource implements ISqlTridentDataSource {
- private final TridentKafkaConfig conf;
- private final String topic;
- private final int primaryKeyIndex;
- private final List<String> fields;
- private final String producerProperties;
- private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
- String producerProperties, List<String> fields) {
- this.conf = conf;
- this.topic = topic;
- this.primaryKeyIndex = primaryKeyIndex;
- this.producerProperties = producerProperties;
- this.fields = fields;
- }
-
- @Override
- public ITridentDataSource getProducer() {
- return new OpaqueTridentKafkaSpout(conf);
- }
-
- @Override
- public Function getConsumer() {
- Preconditions.checkNotNull(producerProperties,
- "Writable Kafka Table " + topic + " must contain producer config");
- Properties props = new Properties();
- try {
- ObjectMapper mapper = new ObjectMapper();
- @SuppressWarnings("unchecked")
- HashMap<String, Object> map = mapper.readValue(producerProperties, HashMap.class);
- @SuppressWarnings("unchecked")
- HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
- props.putAll(producerConfig);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- Preconditions.checkState(props.containsKey("bootstrap.servers"),
- "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
- return new KafkaTridentSink(topic, primaryKeyIndex, props, fields);
- }
- }
-
- @Override
- public String scheme() {
- return "kafka";
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- String properties, List<FieldInfo> fields) {
- int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
- ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
- Map<String, String> values = parseURIParams(uri.getQuery());
- String topic = values.get("topic");
- Preconditions.checkNotNull(topic, "No topic of the spout is specified");
- TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
- List<String> fieldNames = new ArrayList<>();
- int primaryIndex = -1;
- for (int i = 0; i < fields.size(); ++i) {
- FieldInfo f = fields.get(i);
- fieldNames.add(f.name());
- if (f.isPrimary()) {
- primaryIndex = i;
- }
- }
- Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
- conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
-
- return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, fieldNames);
- }
-
- private static Map<String, String> parseURIParams(String query) {
- HashMap<String, String> res = new HashMap<>();
- if (query == null) {
- return res;
- }
-
- String[] params = query.split("&");
- for (String p : params) {
- String[] v = p.split("=", 2);
- if (v.length > 1) {
- res.put(v[0], v[1]);
- }
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 7f687cc..0000000
--- a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
deleted file mode 100644
index 7e85410..0000000
--- a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import org.apache.storm.utils.Utils;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TestJsonRepresentation {
- @Test
- public void testJsonScheme() {
- final List<String> fields = Lists.newArrayList("ID", "val");
- final String s = "{\"ID\": 1, \"val\": \"2\"}";
- JsonScheme scheme = new JsonScheme(fields);
- List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
- assertArrayEquals(new Object[] {1, "2"}, o.toArray());
- }
-
- @Test
- public void testJsonSerializer() {
- final List<String> fields = Lists.newArrayList("ID", "val");
- List<Object> o = Lists.<Object> newArrayList(1, "2");
- JsonSerializer s = new JsonSerializer(fields);
- ByteBuffer buf = s.write(o, null);
- byte[] b = Utils.toByteArray(buf);
- assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
deleted file mode 100644
index f6e75ab..0000000
--- a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.internal.util.reflection.Whitebox;
-import org.apache.storm.kafka.trident.TridentKafkaState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import static org.mockito.Mockito.*;
-
-public class TestKafkaDataSourcesProvider {
- private static final List<FieldInfo> FIELDS = ImmutableList.of(
- new FieldInfo("ID", int.class, true),
- new FieldInfo("val", String.class, false));
- private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
- private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
- private static final String TBL_PROPERTIES = Joiner.on('\n').join(
- "{\"producer\": {",
- "\"bootstrap.servers\": \"localhost:9092\",",
- "\"acks\": \"1\",",
- "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
- "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
- "}",
- "}"
- );
- @SuppressWarnings("unchecked")
- @Test
- public void testKafkaSink() {
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
- Assert.assertNotNull(ds);
- KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
- sink.prepare(null, null);
- TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
- KafkaProducer producer = mock(KafkaProducer.class);
- doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
- Whitebox.setInternalState(state, "producer", producer);
- List<TridentTuple> tupleList = mockTupleList();
- for (TridentTuple t : tupleList) {
- state.updateState(Collections.singletonList(t), null);
- verify(producer).send(argThat(new KafkaMessageMatcher(t)));
- }
- verifyNoMoreInteractions(producer);
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- doReturn(1).when(t0).get(0);
- doReturn(2).when(t1).get(0);
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
- }
-
- private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
- private static final int PRIMARY_INDEX = 0;
- private final TridentTuple tuple;
-
- private KafkaMessageMatcher(TridentTuple tuple) {
- this.tuple = tuple;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean matches(Object o) {
- ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
- if (m.key() != tuple.get(PRIMARY_INDEX)) {
- return false;
- }
- ByteBuffer buf = m.value();
- ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
- return b.equals(buf);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index a16766b..5e1f4f1 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -298,8 +298,8 @@
</includes>
</fileSet>
<fileSet>
- <directory>${project.basedir}/../../external/sql/storm-sql-kafka/target</directory>
- <outputDirectory>external/sql/storm-sql-kafka</outputDirectory>
+ <directory>${project.basedir}/../../external/sql/storm-sql-external/storm-sql-kafka/target</directory>
+ <outputDirectory>external/sql/storm-sql-external/storm-sql-kafka</outputDirectory>
<includes>
<include>storm*jar</include>
</includes>
[2/3] storm git commit: Merge branch 'STORM-2081-1.x' into 1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2081-1.x' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a929504
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a929504
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a929504
Branch: refs/heads/1.x-branch
Commit: 3a9295046d232ab734d293f2301487cb97cc058c
Parents: b0b1228 237ff9b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 6 13:12:27 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 6 13:12:27 2016 +0900
----------------------------------------------------------------------
external/sql/pom.xml | 2 +-
.../storm-sql-external/storm-sql-kafka/pom.xml | 97 +++++++++
.../org/apache/storm/sql/kafka/JsonScheme.java | 58 +++++
.../apache/storm/sql/kafka/JsonSerializer.java | 56 +++++
.../sql/kafka/KafkaDataSourcesProvider.java | 209 +++++++++++++++++++
...apache.storm.sql.runtime.DataSourcesProvider | 16 ++
.../storm/sql/kafka/TestJsonRepresentation.java | 50 +++++
.../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ++++++++++
external/sql/storm-sql-kafka/pom.xml | 97 ---------
.../org/apache/storm/sql/kafka/JsonScheme.java | 58 -----
.../apache/storm/sql/kafka/JsonSerializer.java | 56 -----
.../sql/kafka/KafkaDataSourcesProvider.java | 209 -------------------
...apache.storm.sql.runtime.DataSourcesProvider | 16 --
.../storm/sql/kafka/TestJsonRepresentation.java | 50 -----
.../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ----------
storm-dist/binary/src/main/assembly/binary.xml | 4 +-
16 files changed, 603 insertions(+), 603 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-2081 to CHANGELOG
Posted by ka...@apache.org.
add STORM-2081 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/064c63a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/064c63a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/064c63a6
Branch: refs/heads/1.x-branch
Commit: 064c63a6be69b7817e299570249996d6111324b0
Parents: 3a92950
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 6 13:12:47 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 6 13:12:47 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/064c63a6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f7491dc..5c39009 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -79,6 +79,7 @@
* STORM-1720: Support GEO in storm-redis
## 1.0.3
+ * STORM-2081: create external directory for storm-sql various data sources and move storm-sql-kafka to it
* STORM-2054: DependencyResolver should be aware of relative path and absolute path
* STORM-1344: Remove sql command from storm-jdbc build
* STORM-2070: Fix sigar native binary download link