You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:10:04 UTC
[18/20] storm git commit: [StormSQL] STORM-1357. Support writing to
Kafka streams in Storm SQL.
[StormSQL] STORM-1357. Support writing to Kafka streams in Storm SQL.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b5efe2c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b5efe2c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b5efe2c4
Branch: refs/heads/master
Commit: b5efe2c4433c0245c2fe0c1119e64baf6ecca776
Parents: b60712f
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 30 18:58:29 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800
----------------------------------------------------------------------
.../apache/storm/sql/parser/SqlCreateTable.java | 14 +++++--
.../test/org/apache/storm/sql/TestStormSql.java | 2 +-
.../apache/storm/sql/kafka/JsonSerializer.java | 6 +--
.../sql/kafka/KafkaDataSourcesProvider.java | 35 ++++++++++++----
.../sql/kafka/TestKafkaDataSourcesProvider.java | 17 ++++++--
.../storm/sql/runtime/DataSourcesProvider.java | 3 +-
.../storm/sql/runtime/DataSourcesRegistry.java | 4 +-
.../jvm/storm/kafka/ByteBufferSerializer.java | 41 +++++++++++++++++++
.../src/jvm/storm/kafka/IntSerializer.java | 42 ++++++++++++++++++++
9 files changed, 140 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index 8fe4160..8ac52ed 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -113,13 +113,19 @@ public class SqlCreateTable extends SqlCall {
}
public String inputFormatClass() {
- return inputFormatClass == null ? null : SqlLiteral.stringValue(
- inputFormatClass);
+ return getString(inputFormatClass);
}
public String outputFormatClass() {
- return outputFormatClass == null ? null : SqlLiteral.stringValue
- (outputFormatClass);
+ return getString(outputFormatClass);
+ }
+
+ public String properties() {
+ return getString(properties);
+ }
+
+ private String getString(SqlNode n) {
+ return n == null ? null : SqlLiteral.stringValue(n);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index b238e18..add8da5 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -44,7 +44,7 @@ public class TestStormSql {
@Override
public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
+ String properties, List<FieldInfo> fields) {
throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
index 7c5aa57..e3d5d01 100644
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -20,14 +20,12 @@ package org.apache.storm.sql.kafka;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.base.Preconditions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.lang.CharSet;
import org.apache.storm.sql.runtime.IOutputSerializer;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.List;
class JsonSerializer implements IOutputSerializer {
@@ -53,6 +51,6 @@ class JsonSerializer implements IOutputSerializer {
} catch (IOException e) {
throw new RuntimeException(e);
}
- return ByteBuffer.wrap(sw.toString().getBytes(Charsets.UTF_8));
+ return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
index e57e4d3..7da57ba 100644
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -18,6 +18,7 @@
package org.apache.storm.sql.kafka;
import backtype.storm.spout.SchemeAsMultiScheme;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.storm.sql.runtime.*;
import storm.kafka.ZkHosts;
@@ -33,13 +34,15 @@ import storm.trident.operation.TridentOperationContext;
import storm.trident.spout.ITridentDataSource;
import storm.trident.tuple.TridentTuple;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
/**
- * Create a Kafka spout based on the URI. The URI has the format of
- * kafka://zkhost:port/broker_path?topic=topic.
+ * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
+ * of the Kafka broker.
*/
public class KafkaDataSourcesProvider implements DataSourcesProvider {
private static final int DEFAULT_ZK_PORT = 2181;
@@ -80,11 +83,14 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
private transient TridentKafkaState state;
private final String topic;
private final int primaryKeyIndex;
+ private final Properties producerProperties;
private final List<String> fieldNames;
- private KafkaTridentSink(String topic, int primaryKeyIndex, List<String> fieldNames) {
+ private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties,
+ List<String> fieldNames) {
this.topic = topic;
this.primaryKeyIndex = primaryKeyIndex;
+ this.producerProperties = producerProperties;
this.fieldNames = fieldNames;
}
@@ -100,6 +106,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
state = new TridentKafkaState()
.withKafkaTopicSelector(new StaticTopicSelector(topic))
.withTridentTupleToKafkaMapper(m);
+ state.prepare(producerProperties);
}
@Override
@@ -113,11 +120,13 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
private final String topic;
private final int primaryKeyIndex;
private final List<String> fields;
+ private final Properties producerProperties;
private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
- List<String> fields) {
+ Properties producerProperties, List<String> fields) {
this.conf = conf;
this.topic = topic;
this.primaryKeyIndex = primaryKeyIndex;
+ this.producerProperties = producerProperties;
this.fields = fields;
}
@@ -128,7 +137,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
@Override
public Function getConsumer() {
- return new KafkaTridentSink(topic, primaryKeyIndex, fields);
+ return new KafkaTridentSink(topic, primaryKeyIndex, producerProperties, fields);
}
}
@@ -145,7 +154,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
@Override
public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
+ String properties, List<FieldInfo> fields) {
int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
Map<String, String> values = parseURIParams(uri.getQuery());
@@ -163,7 +172,19 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
}
Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
- return new KafkaTridentDataSource(conf, topic, primaryIndex, fieldNames);
+ ObjectMapper mapper = new ObjectMapper();
+ Properties producerProp = new Properties();
+ try {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> map = mapper.readValue(properties, HashMap.class);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
+ Preconditions.checkNotNull(producerConfig, "Kafka Table must contain producer config");
+ producerProp.putAll(producerConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new KafkaTridentDataSource(conf, topic, primaryIndex, producerProp, fieldNames);
}
private static Map<String, String> parseURIParams(String query) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
index 531f764..9380b66 100644
--- a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -17,10 +17,11 @@
*/
package org.apache.storm.sql.kafka;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
import org.apache.storm.sql.runtime.DataSourcesRegistry;
import org.apache.storm.sql.runtime.FieldInfo;
@@ -46,12 +47,20 @@ public class TestKafkaDataSourcesProvider {
new FieldInfo("val", String.class, false));
private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-
+ private static final String TBL_PROPERTIES = Joiner.on('\n').join(
+ "{\"producer\": {",
+ "\"bootstrap.servers\": \"localhost:9092\",",
+ "\"acks\": \"1\",",
+ "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
+ "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
+ "}",
+ "}"
+ );
@SuppressWarnings("unchecked")
@Test
public void testKafkaSink() {
ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create("kafka://mock?topic=foo"), null, null, FIELDS);
+ URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
Assert.assertNotNull(ds);
KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
sink.prepare(null, null);
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
index eaabc8d..0ce2d45 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -22,7 +22,6 @@ package org.apache.storm.sql.runtime;
import java.net.URI;
import java.util.List;
-import java.util.Map;
public interface DataSourcesProvider {
/**
@@ -46,5 +45,5 @@ public interface DataSourcesProvider {
ISqlTridentDataSource constructTrident(
URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields);
+ String properties, List<FieldInfo> fields);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index 0285c97..75cd391 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -61,13 +61,13 @@ public class DataSourcesRegistry {
public static ISqlTridentDataSource constructTridentDataSource(
URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
+ String properties, List<FieldInfo> fields) {
DataSourcesProvider provider = providers.get(uri.getScheme());
if (provider == null) {
return null;
}
- return provider.constructTrident(uri, inputFormatClass, outputFormatClass, fields);
+ return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
new file mode 100644
index 0000000..1a7238e
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.utils.Utils;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public byte[] serialize(String s, ByteBuffer b) {
+ return Utils.toByteArray(b);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
new file mode 100644
index 0000000..07cbd26
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.Map;
+
+public class IntSerializer implements Serializer<Integer> {
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+ }
+
+ @Override
+ public byte[] serialize(String topic, Integer val) {
+ byte[] r = new byte[4];
+ IntBuffer b = ByteBuffer.wrap(r).asIntBuffer();
+ b.put(val);
+ return r;
+ }
+
+ @Override
+ public void close() {
+ }
+}