You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:16 UTC
[13/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
deleted file mode 100644
index 2aa98ba..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.kafka.trident.TridentKafkaConfig;
-import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
-import org.apache.storm.kafka.trident.TridentKafkaUpdater;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
- * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
- * of the Kafka broker.
- */
-public class KafkaDataSourcesProvider implements DataSourcesProvider {
- private static final int DEFAULT_ZK_PORT = 2181;
-
- private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
- private final int primaryKeyIndex;
- private final IOutputSerializer serializer;
-
- private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
- this.primaryKeyIndex = primaryKeyIndex;
- this.serializer = serializer;
- }
-
- @Override
- public Object getKeyFromTuple(TridentTuple tuple) {
- return tuple.get(primaryKeyIndex);
- }
-
- @Override
- public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
- return serializer.write(tuple.getValues(), null);
- }
- }
-
- private static class KafkaTridentDataSource implements ISqlTridentDataSource {
- private final TridentKafkaConfig conf;
- private final String topic;
- private final int primaryKeyIndex;
- private final Properties props;
- private final IOutputSerializer serializer;
- private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
- Properties props, IOutputSerializer serializer) {
- this.conf = conf;
- this.topic = topic;
- this.primaryKeyIndex = primaryKeyIndex;
- this.props = props;
- this.serializer = serializer;
- }
-
- @Override
- public ITridentDataSource getProducer() {
- return new OpaqueTridentKafkaSpout(conf);
- }
-
- @Override
- public SqlTridentConsumer getConsumer() {
- Preconditions.checkArgument(!props.isEmpty(),
- "Writable Kafka Table " + topic + " must contain producer config");
- HashMap<String, Object> producerConfig = (HashMap<String, Object>) props.get("producer");
- props.putAll(producerConfig);
- Preconditions.checkState(props.containsKey("bootstrap.servers"),
- "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
-
- SqlKafkaMapper mapper = new SqlKafkaMapper(primaryKeyIndex, serializer);
-
- TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
- .withKafkaTopicSelector(new DefaultTopicSelector(topic))
- .withProducerProperties(props)
- .withTridentTupleToKafkaMapper(mapper);
-
- TridentKafkaUpdater stateUpdater = new TridentKafkaUpdater();
-
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
- }
- }
-
- @Override
- public String scheme() {
- return "kafka";
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
- ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
- Map<String, String> values = parseURIParams(uri.getQuery());
- String topic = values.get("topic");
- Preconditions.checkNotNull(topic, "No topic of the spout is specified");
- TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
- List<String> fieldNames = new ArrayList<>();
- int primaryIndex = -1;
- for (int i = 0; i < fields.size(); ++i) {
- FieldInfo f = fields.get(i);
- fieldNames.add(f.name());
- if (f.isPrimary()) {
- primaryIndex = i;
- }
- }
- Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
- Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
- conf.scheme = new SchemeAsMultiScheme(scheme);
- IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
-
- return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, serializer);
- }
-
- private static Map<String, String> parseURIParams(String query) {
- HashMap<String, String> res = new HashMap<>();
- if (query == null) {
- return res;
- }
-
- String[] params = query.split("&");
- for (String p : params) {
- String[] v = p.split("=", 2);
- if (v.length > 1) {
- res.put(v[0], v[1]);
- }
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 7f687cc..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
deleted file mode 100644
index 0cde492..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.storm.kafka.trident.TridentKafkaState;
-import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
-import org.apache.storm.kafka.trident.TridentKafkaUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Future;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class TestKafkaDataSourcesProvider {
- private static final List<FieldInfo> FIELDS = ImmutableList.of(
- new FieldInfo("ID", int.class, true),
- new FieldInfo("val", String.class, false));
- private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
- private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
- private static final Properties TBL_PROPERTIES = new Properties();
-
- static {
- Map<String,Object> map = new HashMap<>();
- map.put("bootstrap.servers", "localhost:9092");
- map.put("acks", "1");
- map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- TBL_PROPERTIES.put("producer", map);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testKafkaSink() {
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
- Assert.assertNotNull(ds);
-
- ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
- Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass());
- Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass());
-
- TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- KafkaProducer producer = mock(KafkaProducer.class);
- doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
- Whitebox.setInternalState(state, "producer", producer);
-
- List<TridentTuple> tupleList = mockTupleList();
- for (TridentTuple t : tupleList) {
- state.updateState(Collections.singletonList(t), null);
- verify(producer).send(argThat(new KafkaMessageMatcher(t)));
- }
- verifyNoMoreInteractions(producer);
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- doReturn(1).when(t0).get(0);
- doReturn(2).when(t1).get(0);
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
- }
-
- private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
- private static final int PRIMARY_INDEX = 0;
- private final TridentTuple tuple;
-
- private KafkaMessageMatcher(TridentTuple tuple) {
- this.tuple = tuple;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean matches(Object o) {
- ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
- if (m.key() != tuple.get(PRIMARY_INDEX)) {
- return false;
- }
- ByteBuffer buf = m.value();
- ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
- return b.equals(buf);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
deleted file mode 100644
index 40fde8a..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-mongodb</artifactId>
-
- <developers>
- <developer>
- <id>vesense</id>
- <name>Xin Wang</name>
- <email>data.xinwang@gmail.com</email>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-mongodb</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/resources</directory>
- </resource>
- </resources>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
deleted file mode 100644
index 60d52d1..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.mongodb;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.trident.state.MongoState;
-import org.apache.storm.mongodb.trident.state.MongoStateFactory;
-import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a MongoDB sink based on the URI and properties. The URI has the format of
- * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
- * The properties are in JSON format which specifies the name of the MongoDB collection and etc.
- */
-public class MongoDataSourcesProvider implements DataSourcesProvider {
-
- private static class MongoTridentDataSource implements ISqlTridentDataSource {
- private final String url;
- private final Properties props;
- private final IOutputSerializer serializer;
-
- private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
- this.url = url;
- this.props = props;
- this.serializer = serializer;
- }
-
- @Override
- public ITridentDataSource getProducer() {
- throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
- }
-
- @Override
- public SqlTridentConsumer getConsumer() {
- Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
- String serField = props.getProperty("trident.ser.field", "tridentSerField");
- MongoMapper mapper = new TridentMongoMapper(serField, serializer);
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(props.getProperty("collection.name"))
- .withMapper(mapper);
-
- StateFactory stateFactory = new MongoStateFactory(options);
- StateUpdater stateUpdater = new MongoStateUpdater();
-
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
- }
- }
-
- private static class TridentMongoMapper implements MongoMapper {
- private final String serField;
- private final IOutputSerializer serializer;
-
- private TridentMongoMapper(String serField, IOutputSerializer serializer) {
- this.serField = serField;
- this.serializer = serializer;
- }
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- byte[] array = serializer.write(tuple.getValues(), null).array();
- document.append(serField, array);
- return document;
- }
-
- @Override
- public Document toDocumentByKeys(List<Object> keys) {
- return null;
- }
- }
-
- @Override
- public String scheme() {
- return "mongodb";
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
- IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
- return new MongoTridentDataSource(uri.toString(), properties, serializer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index e46d794..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.mongodb.MongoDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
deleted file mode 100644
index 3b15345..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.mongodb;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.storm.mongodb.common.MongoDBClient;
-import org.apache.storm.mongodb.trident.state.MongoState;
-import org.apache.storm.mongodb.trident.state.MongoStateFactory;
-import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.bson.Document;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class TestMongoDataSourcesProvider {
- private static final List<FieldInfo> FIELDS = ImmutableList.of(
- new FieldInfo("ID", int.class, true),
- new FieldInfo("val", String.class, false));
- private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
- private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
- private static final Properties TBL_PROPERTIES = new Properties();
-
- static {
- TBL_PROPERTIES.put("collection.name", "collection1");
- TBL_PROPERTIES.put("trident.ser.field", "tridentSerField");
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMongoSink() {
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create("mongodb://127.0.0.1:27017/test"), null, null, TBL_PROPERTIES, FIELDS);
- Assert.assertNotNull(ds);
-
- ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
- Assert.assertEquals(MongoStateFactory.class, consumer.getStateFactory().getClass());
- Assert.assertEquals(MongoStateUpdater.class, consumer.getStateUpdater().getClass());
-
- MongoState state = (MongoState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- StateUpdater stateUpdater = consumer.getStateUpdater();
-
- MongoDBClient mongoClient = mock(MongoDBClient.class);
- Whitebox.setInternalState(state, "mongoClient", mongoClient);
-
- List<TridentTuple> tupleList = mockTupleList();
-
- for (TridentTuple t : tupleList) {
- stateUpdater.updateState(state, Collections.singletonList(t), null);
- verify(mongoClient).insert(argThat(new MongoArgMatcher(t)) , eq(true));
- }
-
- verifyNoMoreInteractions(mongoClient);
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- doReturn(1).when(t0).get(0);
- doReturn(2).when(t1).get(0);
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
- }
-
- private static class MongoArgMatcher extends ArgumentMatcher<List<Document>> {
- private final TridentTuple tuple;
-
- private MongoArgMatcher(TridentTuple tuple) {
- this.tuple = tuple;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean matches(Object o) {
- Document doc = ((List<Document>)o).get(0);
- ByteBuffer buf = ByteBuffer.wrap((byte[])doc.get(TBL_PROPERTIES.getProperty("trident.ser.field")));
- ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
- return b.equals(buf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/pom.xml b/external/sql/storm-sql-external/storm-sql-redis/pom.xml
deleted file mode 100644
index 19cc699..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/pom.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-redis</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-sql-runtime</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-redis</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/resources</directory>
- </resource>
- </resources>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
deleted file mode 100644
index 68933b2..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.redis;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.trident.state.RedisClusterState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.tuple.ITuple;
-import redis.clients.util.JedisURIHelper;
-
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a Redis sink based on the URI and properties. The URI has the format of
- * redis://:[password]@[host]:[port]/[dbIdx]. Only host is mandatory and others can be set to default.
- *
- * The properties are in JSON format which specifies the config of the Redis data type and etc.
- * Please note that when "use.redis.cluster" is "true", cluster discovery is only done from given URI.
- */
-public class RedisDataSourcesProvider implements DataSourcesProvider {
- private static final int DEFAULT_REDIS_PORT = 6379;
- private static final int DEFAULT_TIMEOUT = 2000;
-
- private abstract static class AbstractRedisTridentDataSource implements ISqlTridentDataSource, Serializable {
- protected abstract StateFactory newStateFactory();
- protected abstract StateUpdater newStateUpdater(RedisStoreMapper storeMapper);
-
- private final Properties props;
- private final List<FieldInfo> fields;
- private final IOutputSerializer serializer;
-
- AbstractRedisTridentDataSource(Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
- this.props = props;
- this.fields = fields;
- this.serializer = serializer;
- }
-
- @Override
- public ITridentDataSource getProducer() {
- throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
- }
-
- @Override
- public SqlTridentConsumer getConsumer() {
- RedisDataTypeDescription dataTypeDescription = getDataTypeDesc(props);
-
- RedisStoreMapper storeMapper = new TridentRedisStoreMapper(dataTypeDescription, fields, serializer);
-
- StateFactory stateFactory = newStateFactory();
- StateUpdater stateUpdater = newStateUpdater(storeMapper);
-
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
- }
-
- private RedisDataTypeDescription getDataTypeDesc(Properties props) {
- Preconditions.checkArgument(props.containsKey("data.type"),
- "Redis data source must contain \"data.type\" config");
-
- RedisDataTypeDescription.RedisDataType dataType = RedisDataTypeDescription.RedisDataType.valueOf(props.getProperty("data.type").toUpperCase());
- String additionalKey = props.getProperty("data.additional.key");
-
- return new RedisDataTypeDescription(dataType, additionalKey);
- }
- }
-
- private static class RedisClusterTridentDataSource extends AbstractRedisTridentDataSource {
- private final JedisClusterConfig config;
-
- RedisClusterTridentDataSource(JedisClusterConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
- super(props, fields, serializer);
- this.config = config;
- }
-
- @Override
- protected StateFactory newStateFactory() {
- return new RedisClusterState.Factory(config);
- }
-
- @Override
- protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
- return new RedisClusterStateUpdater(storeMapper);
- }
- }
-
- private static class RedisTridentDataSource extends AbstractRedisTridentDataSource {
- private final JedisPoolConfig config;
-
- RedisTridentDataSource(JedisPoolConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
- super(props, fields, serializer);
- this.config = config;
- }
-
- @Override
- protected StateFactory newStateFactory() {
- return new RedisState.Factory(config);
- }
-
- @Override
- protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
- return new RedisStateUpdater(storeMapper);
- }
- }
-
- @Override
- public String scheme() {
- return "redis";
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties props, List<FieldInfo> fields) {
- Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis: " + uri);
-
- String host = uri.getHost();
- int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_REDIS_PORT;
- int dbIdx = JedisURIHelper.getDBIndex(uri);
- String password = JedisURIHelper.getPassword(uri);
-
- int timeout = Integer.parseInt(props.getProperty("redis.timeout", String.valueOf(DEFAULT_TIMEOUT)));
-
- boolean clusterMode = Boolean.valueOf(props.getProperty("use.redis.cluster", "false"));
-
- List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
- IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, props, fieldNames);
- if (clusterMode) {
- JedisClusterConfig config = new JedisClusterConfig.Builder()
- .setNodes(Collections.singleton(new InetSocketAddress(host, port)))
- .setTimeout(timeout)
- .build();
- return new RedisClusterTridentDataSource(config, props, fields, serializer);
- } else {
- JedisPoolConfig config = new JedisPoolConfig(host, port, timeout, password, dbIdx);
- return new RedisTridentDataSource(config, props, fields, serializer);
- }
- }
-
- private static class TridentRedisStoreMapper implements RedisStoreMapper {
- private final RedisDataTypeDescription dataTypeDescription;
- private final FieldInfo primaryKeyField;
- private final IOutputSerializer outputSerializer;
-
- private TridentRedisStoreMapper(RedisDataTypeDescription dataTypeDescription, List<FieldInfo> fields, IOutputSerializer outputSerializer) {
- this.dataTypeDescription = dataTypeDescription;
- this.outputSerializer = outputSerializer;
-
- // find primary key from constructor
- FieldInfo pkField = findPrimaryKeyField(fields);
- Preconditions.checkArgument(pkField != null, "Primary key must be presented to field list");
-
- this.primaryKeyField = pkField;
- }
-
- private FieldInfo findPrimaryKeyField(List<FieldInfo> fields) {
- FieldInfo pkField = null;
- for (FieldInfo field : fields) {
- if (field.isPrimary()) {
- // TODO: this assumes key is only from the one field
- // if not we need to have order of fields in PK
- pkField = field;
- break;
- }
- }
- return pkField;
- }
-
- @Override
- public RedisDataTypeDescription getDataTypeDescription() {
- return dataTypeDescription;
- }
-
- @Override
- public String getKeyFromTuple(ITuple tuple) {
- String keyFieldName = primaryKeyField.name();
- Object key = tuple.getValueByField(keyFieldName);
- if (key == null) {
- throw new NullPointerException("key field " + keyFieldName + " is null");
- }
- return String.valueOf(key);
- }
-
- @Override
- public String getValueFromTuple(ITuple tuple) {
- byte[] array = outputSerializer.write(tuple.getValues(), null).array();
- return new String(array);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 23b0444..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.redis.RedisDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
deleted file mode 100644
index 94d4949..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.redis;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.state.RedisClusterState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.Pipeline;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class TestRedisDataSourcesProvider {
- private static final List<FieldInfo> FIELDS = ImmutableList.of(
- new FieldInfo("ID", int.class, true),
- new FieldInfo("val", String.class, false));
- private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
- private static final String ADDITIONAL_KEY = "hello";
- private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
- private static final Properties TBL_PROPERTIES = new Properties();
- private static final Properties CLUSTER_TBL_PROPERTIES = new Properties();
-
- static {
- TBL_PROPERTIES.put("data.type", "HASH");
- TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
- CLUSTER_TBL_PROPERTIES.put("data.type", "HASH");
- CLUSTER_TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
- CLUSTER_TBL_PROPERTIES.put("use.redis.cluster", "true");
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testRedisSink() {
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create("redis://:foobared@localhost:6380/2"), null, null, TBL_PROPERTIES, FIELDS);
- Assert.assertNotNull(ds);
-
- ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
- Assert.assertEquals(RedisState.Factory.class, consumer.getStateFactory().getClass());
- Assert.assertEquals(RedisStateUpdater.class, consumer.getStateUpdater().getClass());
-
- RedisState state = (RedisState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- StateUpdater stateUpdater = consumer.getStateUpdater();
-
- JedisPool mockJedisPool = mock(JedisPool.class);
- Jedis mockJedis = mock(Jedis.class);
- Pipeline mockPipeline = mock(Pipeline.class);
-
- Whitebox.setInternalState(state, "jedisPool", mockJedisPool);
- when(mockJedisPool.getResource()).thenReturn(mockJedis);
- when(mockJedis.pipelined()).thenReturn(mockPipeline);
-
- List<TridentTuple> tupleList = mockTupleList();
-
- stateUpdater.updateState(state, tupleList, null);
- for (TridentTuple t : tupleList) {
- // PK goes to the key
- String id = String.valueOf(t.getValueByField("ID"));
- String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
- verify(mockPipeline).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
- }
-
- verify(mockPipeline).sync();
- verify(mockJedis).close();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testRedisClusterSink() throws IOException {
- ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
- URI.create("redis://localhost:6380"), null, null, CLUSTER_TBL_PROPERTIES, FIELDS);
- Assert.assertNotNull(ds);
-
- ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
- Assert.assertEquals(RedisClusterState.Factory.class, consumer.getStateFactory().getClass());
- Assert.assertEquals(RedisClusterStateUpdater.class, consumer.getStateUpdater().getClass());
-
- RedisClusterState state = (RedisClusterState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- StateUpdater stateUpdater = consumer.getStateUpdater();
-
- JedisCluster mockJedisCluster = mock(JedisCluster.class);
-
- Whitebox.setInternalState(state, "jedisCluster", mockJedisCluster);
-
- List<TridentTuple> tupleList = mockTupleList();
-
- stateUpdater.updateState(state, tupleList, null);
- for (TridentTuple t : tupleList) {
- // PK goes to the key
- String id = String.valueOf(t.getValueByField("ID"));
- String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
- verify(mockJedisCluster).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
- }
-
- verify(mockJedisCluster, never()).close();
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- when(t0.getValueByField("ID")).thenReturn(1);
- when(t0.getValueByField("val")).thenReturn("2");
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
-
- when(t1.getValueByField("ID")).thenReturn(2);
- when(t1.getValueByField("val")).thenReturn("3");
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
-
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
deleted file mode 100644
index 4666df4..0000000
--- a/external/sql/storm-sql-runtime/pom.xml
+++ /dev/null
@@ -1,136 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-sql-runtime</artifactId>
-
- <developers>
- <developer>
- <id>haohui</id>
- <name>Haohui Mai</name>
- <email>ricetons@gmail.com</email>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- <version>${calcite.version}</version>
- <exclusions>
- <exclusion>
- <groupId>commons-dbcp</groupId>
- <artifactId>commons-dbcp</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>pentaho-aggdesigner-algorithm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.7.7</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-csv</artifactId>
- <version>1.4</version>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <testSourceDirectory>src/test</testSourceDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/resources</directory>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>appassembler-maven-plugin</artifactId>
- <version>1.9</version>
- <executions>
- <execution>
- <id>create-repo</id>
- <goals>
- <goal>create-repository</goal>
- </goals>
- <configuration>
- <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
- <repositoryLayout>flat</repositoryLayout>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
deleted file mode 100644
index aa7e435..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.interpreter;
-
-import org.apache.calcite.DataContext;
-
-import java.io.Serializable;
-
-/**
- * This is a hack to use Calcite Context.
- */
-public class StormContext extends Context implements Serializable {
- public StormContext(DataContext root) {
- super(root);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
deleted file mode 100644
index 64be39d..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public abstract class AbstractChannelHandler implements ChannelHandler {
- @Override
- public abstract void dataReceived(ChannelContext ctx, Values data);
-
- @Override
- public void channelInactive(ChannelContext ctx) {
-
- }
-
- @Override
- public void exceptionCaught(Throwable cause) {
-
- }
-
- @Override
- public void flush(ChannelContext ctx) {
- ctx.flush();
- }
-
- @Override
- public void setSource(ChannelContext ctx, Object source) {
-
- }
-
- public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
- @Override
- public void dataReceived(ChannelContext ctx, Values data) {
- ctx.emit(data);
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
deleted file mode 100644
index 5b18b5b..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-
-import java.util.Map;
-
-/**
- * Subclass of AbstractTupleProcessor provides a series of tuple. It
- * takes a series of iterators of {@link Values} and produces a stream of
- * tuple.
- *
- * The subclass implements the {@see next()} method to provide
- * the output of the stream. It can choose to return null in {@see next()} to
- * indicate that this particular iteration is a no-op. SQL processors depend
- * on this semantic to implement filtering and nullable records.
- */
-public abstract class AbstractValuesProcessor {
-
- /**
- * Initialize the data sources.
- *
- * @param data a map from the table name to the iterators of the values.
- *
- */
- public abstract void initialize(Map<String, DataSource> data, ChannelHandler
- result);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
deleted file mode 100644
index 65ad01c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public interface ChannelContext {
- /**
- * Emit data to the next stage of the data pipeline.
- */
- void emit(Values data);
- void fireChannelInactive();
- void flush();
- void setSource(Object source);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
deleted file mode 100644
index af02b7e..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-/**
- * DataListener provides an event-driven interface for the user to process
- * series of events.
- */
-public interface ChannelHandler {
- void dataReceived(ChannelContext ctx, Values data);
-
- /**
- * The producer of the data has indicated that the channel is no longer
- * active.
- * @param ctx
- */
- void channelInactive(ChannelContext ctx);
-
- void exceptionCaught(Throwable cause);
-
- void flush(ChannelContext ctx);
-
- void setSource(ChannelContext ctx, Object source);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
deleted file mode 100644
index 3b5eedd..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public class Channels {
- private static final ChannelContext VOID_CTX = new ChannelContext() {
- @Override
- public void emit(Values data) {}
-
- @Override
- public void fireChannelInactive() {}
-
- @Override
- public void flush() {
-
- }
-
- @Override
- public void setSource(java.lang.Object source) {
-
- }
- };
-
- private static class ChannelContextAdapter implements ChannelContext {
- private final ChannelHandler handler;
- private final ChannelContext next;
-
- public ChannelContextAdapter(
- ChannelContext next, ChannelHandler handler) {
- this.handler = handler;
- this.next = next;
- }
-
- @Override
- public void emit(Values data) {
- handler.dataReceived(next, data);
- }
-
- @Override
- public void fireChannelInactive() {
- handler.channelInactive(next);
- }
-
- @Override
- public void flush() {
- handler.flush(next);
- }
-
- @Override
- public void setSource(java.lang.Object source) {
- handler.setSource(next, source);
- next.setSource(source); // propagate through the chain
- }
- }
-
- private static class ForwardingChannelContext implements ChannelContext {
- private final ChannelContext next;
-
- public ForwardingChannelContext(ChannelContext next) {
- this.next = next;
- }
-
- @Override
- public void emit(Values data) {
- next.emit(data);
- }
-
- @Override
- public void fireChannelInactive() {
- next.fireChannelInactive();
- }
-
- @Override
- public void flush() {
- next.flush();
- }
-
- @Override
- public void setSource(Object source) {
- next.setSource(source);
- }
- }
-
- public static ChannelContext chain(
- ChannelContext next, ChannelHandler handler) {
- return new ChannelContextAdapter(next, handler);
- }
-
- public static ChannelContext voidContext() {
- return VOID_CTX;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
deleted file mode 100644
index 352af73..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-/**
- * A DataSource ingests data in StormSQL. It provides a series of tuple to
- * the downstream {@link ChannelHandler}.
- *
- */
-public interface DataSource {
- void open(ChannelContext ctx);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
deleted file mode 100644
index dbece9c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-public interface DataSourcesProvider {
- /**
- * @return the scheme of the data source
- */
- String scheme();
-
- /**
- * Construct a new data source.
- * @param uri The URI that specifies the data source. The format of the URI
- * is fully customizable.
- * @param inputFormatClass the name of the class that deserializes data.
- * It is null when unspecified.
- * @param outputFormatClass the name of the class that serializes data. It
- * is null when unspecified.
- * @param fields The name of the fields and the schema of the table.
- */
- DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields);
-
- ISqlTridentDataSource constructTrident(
- URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
deleted file mode 100644
index dfefb01..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.ServiceLoader;
-
-public class DataSourcesRegistry {
- private static final Logger LOG = LoggerFactory.getLogger(
- DataSourcesRegistry.class);
- private static final Map<String, DataSourcesProvider> providers;
-
- static {
- providers = new HashMap<>();
- ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
- DataSourcesProvider.class);
- for (DataSourcesProvider p : loader) {
- LOG.info("Registering scheme {} with {}", p.scheme(), p);
- providers.put(p.scheme(), p);
- }
- }
-
- private DataSourcesRegistry() {
- }
-
- public static DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- DataSourcesProvider provider = providers.get(uri.getScheme());
- if (provider == null) {
- return null;
- }
-
- return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
- }
-
- public static ISqlTridentDataSource constructTridentDataSource(
- URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- DataSourcesProvider provider = providers.get(uri.getScheme());
- if (provider == null) {
- return null;
- }
-
- return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
- }
-
- /**
- * Allow unit tests to inject data sources.
- */
- public static Map<String, DataSourcesProvider> providerMap() {
- return providers;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
deleted file mode 100644
index 03b030b..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import java.io.Serializable;
-
-/**
- * Describe each column of the field
- */
-public class FieldInfo implements Serializable {
- private final String name;
- private final Class<?> type;
- private final boolean isPrimary;
-
- public FieldInfo(String name, Class<?> type, boolean isPrimary) {
- this.name = name;
- this.type = type;
- this.isPrimary = isPrimary;
- }
-
- public String name() {
- return name;
- }
-
- public Class<?> type() {
- return type;
- }
-
- public boolean isPrimary() {
- return isPrimary;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
deleted file mode 100644
index b6670d9..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public interface IOutputSerializer {
- /**
- * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
- * memory.
- *
- * @return A ByteBuffer contains the serialized result.
- */
- ByteBuffer write(List<Object> data, ByteBuffer buffer);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
deleted file mode 100644
index 9eae5ae..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-/**
- * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
- */
-public interface ISqlTridentDataSource {
- /**
- * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
- *
- * Please note that StateFactory and StateUpdater should use same class which implements State.
- *
- * @see org.apache.storm.trident.state.StateFactory
- * @see org.apache.storm.trident.state.StateUpdater
- */
- interface SqlTridentConsumer {
- StateFactory getStateFactory();
- StateUpdater getStateUpdater();
- }
-
- /**
- * Provides instance of ITridentDataSource which can be used as producer in Trident.
- *
- * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
- * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
- * - IBatchSpout
- * - ITridentSpout
- * - IPartitionedTridentSpout
- * - IOpaquePartitionedTridentSpout
- *
- * @see org.apache.storm.trident.spout.ITridentDataSource
- * @see org.apache.storm.trident.spout.IBatchSpout
- * @see org.apache.storm.trident.spout.ITridentSpout
- * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
- * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
- */
- ITridentDataSource getProducer();
-
- /**
- * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
- *
- * @see SqlTridentConsumer
- */
- SqlTridentConsumer getConsumer();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
deleted file mode 100644
index c9abd16..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-public class SimpleSqlTridentConsumer implements ISqlTridentDataSource.SqlTridentConsumer {
- private final StateFactory stateFactory;
- private final StateUpdater stateUpdater;
-
- public SimpleSqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) {
- this.stateFactory = stateFactory;
- this.stateUpdater = stateUpdater;
- }
-
- @Override
- public StateFactory getStateFactory() {
- return stateFactory;
- }
-
- @Override
- public StateUpdater getStateUpdater() {
- return stateUpdater;
- }
-}