You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:44 UTC
[2/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API
to Streams API
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index 7383235..9fe963e 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -47,15 +47,15 @@ public class DataSourcesRegistry {
}
/**
- * Construct a trident data source.
+ * Construct a streams data source.
* @param uri data source uri
* @param inputFormatClass input format class
* @param outputFormatClass output format class
* @param properties Properties
* @param fields fields info list
- * @return TridentDataSource object
+ * @return StreamsDataSource object
*/
- public static ISqlTridentDataSource constructTridentDataSource(
+ public static ISqlStreamsDataSource constructStreamsDataSource(
URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
DataSourcesProvider provider = providers.get(uri.getScheme());
@@ -63,7 +63,7 @@ public class DataSourcesRegistry {
return null;
}
- return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
+ return provider.constructStreams(uri, inputFormatClass, outputFormatClass, properties, fields);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java
new file mode 100644
index 0000000..da569ad
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+
+/**
+ * An ISqlStreamsDataSource specifies how an external data source produces and consumes data.
+ */
+public interface ISqlStreamsDataSource {
+ /**
+ * Provides instance of IRichSpout which can be used as producer in topology.
+ *
+ * @see org.apache.storm.topology.IRichSpout
+ */
+ IRichSpout getProducer();
+
+ /**
+ * Provides instance of IRichBolt which can be used as consumer in topology.
+ */
+ IRichBolt getConsumer();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
deleted file mode 100644
index 81fb386..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-/**
- * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
- */
-public interface ISqlTridentDataSource {
- /**
- * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
- * <p/>
- * Please note that StateFactory and StateUpdater should use same class which implements State.
- *
- * @see org.apache.storm.trident.state.StateFactory
- * @see org.apache.storm.trident.state.StateUpdater
- */
- interface SqlTridentConsumer {
- StateFactory getStateFactory();
-
- StateUpdater getStateUpdater();
- }
-
- /**
- * Provides instance of ITridentDataSource which can be used as producer in Trident.
- * <p/>
- * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
- * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
- * - IBatchSpout
- * - ITridentSpout
- * - IPartitionedTridentSpout
- * - IOpaquePartitionedTridentSpout
- *
- * @see org.apache.storm.trident.spout.ITridentDataSource
- * @see org.apache.storm.trident.spout.IBatchSpout
- * @see org.apache.storm.trident.spout.ITridentSpout
- * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
- * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
- */
- ITridentDataSource getProducer();
-
- /**
- * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
- *
- * @see SqlTridentConsumer
- */
- SqlTridentConsumer getConsumer();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
deleted file mode 100644
index f9a6039..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-public class SimpleSqlTridentConsumer implements ISqlTridentDataSource.SqlTridentConsumer {
- private final StateFactory stateFactory;
- private final StateUpdater stateUpdater;
-
- public SimpleSqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) {
- this.stateFactory = stateFactory;
- this.stateUpdater = stateUpdater;
- }
-
- @Override
- public StateFactory getStateFactory() {
- return stateFactory;
- }
-
- @Override
- public StateUpdater getStateUpdater() {
- return stateUpdater;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
index 10e470d..48f3273 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -26,16 +26,13 @@ import org.apache.storm.spout.Scheme;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater;
-import org.apache.storm.sql.runtime.datasource.socket.trident.TridentSocketSpout;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.sql.runtime.datasource.socket.bolt.SocketBolt;
+import org.apache.storm.sql.runtime.datasource.socket.spout.SocketSpout;
import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
/**
* Create a Socket data source based on the URI and properties. The URI has the format of
@@ -50,35 +47,33 @@ public class SocketDataSourcesProvider implements DataSourcesProvider {
return "socket";
}
- private static class SocketTridentDataSource implements ISqlTridentDataSource {
+ private static class SocketStreamsDataSource implements ISqlStreamsDataSource {
private final String host;
private final int port;
private final Scheme scheme;
private final IOutputSerializer serializer;
- SocketTridentDataSource(Scheme scheme, IOutputSerializer serializer, String host, int port) {
- this.scheme = scheme;
- this.serializer = serializer;
+ SocketStreamsDataSource(String host, int port, Scheme scheme, IOutputSerializer serializer) {
this.host = host;
this.port = port;
+ this.scheme = scheme;
+ this.serializer = serializer;
}
@Override
- public ITridentDataSource getProducer() {
- return new TridentSocketSpout(scheme, host, port);
+ public IRichSpout getProducer() {
+ return new SocketSpout(scheme, host, port);
}
@Override
- public SqlTridentConsumer getConsumer() {
- StateFactory stateFactory = new SocketState.Factory(host, port);
- StateUpdater<SocketState> stateUpdater = new SocketStateUpdater(serializer);
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ public IRichBolt getConsumer() {
+ return new SocketBolt(serializer, host, port);
}
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
String host = uri.getHost();
int port = uri.getPort();
@@ -90,6 +85,6 @@ public class SocketDataSourcesProvider implements DataSourcesProvider {
Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
- return new SocketTridentDataSource(scheme, serializer, host, port);
+ return new SocketDataSourcesProvider.SocketStreamsDataSource(host, port, scheme, serializer);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
new file mode 100644
index 0000000..78adef3
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this for production.
+ */
+public class SocketBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(SocketBolt.class);
+
+ private final IOutputSerializer serializer;
+ private final String host;
+ private final int port;
+
+ private transient OutputCollector collector;
+ private transient BufferedWriter writer;
+ private transient Socket socket;
+
+ /**
+ * Constructor.
+ */
+ public SocketBolt(IOutputSerializer serializer, String host, int port) {
+ this.serializer = serializer;
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+
+ try {
+ this.socket = new Socket(host, port);
+ this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
+ } catch (IOException e) {
+ throw new RuntimeException("Exception while initializing socket for State. host "
+ + host + " port " + port, e);
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ Values values = (Values) input.getValue(0);
+ byte[] array = serializer.write(values, null).array();
+ String data = new String(array);
+
+ try {
+ writer.write(data + "\n");
+ writer.flush();
+ collector.ack(input);
+ } catch (IOException e) {
+ LOG.error("Error while writing data to socket.", e);
+ collector.reportError(e);
+ collector.fail(input);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ IOUtils.closeQuietly(writer);
+ IOUtils.closeQuietly(socket);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
new file mode 100644
index 0000000..b8743b9
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.spout;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Spout for Socket data. Only available for Storm SQL.
+ * The class doesn't handle reconnection, so you may not want to use this for production.
+ */
+public class SocketSpout implements IRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(SocketSpout.class);
+
+ private final String host;
+ private final int port;
+ private final Scheme scheme;
+
+ private volatile boolean running;
+
+ private BlockingDeque<List<Object>> queue;
+ private Socket socket;
+ private Thread readerThread;
+ private BufferedReader in;
+ private ObjectMapper objectMapper;
+
+ private SpoutOutputCollector collector;
+ private Map<String, List<Object>> emitted;
+
+ /**
+ * SocketSpout Constructor.
+ * @param scheme Scheme
+ * @param host socket host
+ * @param port socket port
+ */
+ public SocketSpout(Scheme scheme, String host, int port) {
+ this.scheme = scheme;
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.queue = new LinkedBlockingDeque<>();
+ this.emitted = new HashMap<>();
+ this.objectMapper = new ObjectMapper();
+
+ try {
+ socket = new Socket(host, port);
+ in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ } catch (IOException e) {
+ throw new RuntimeException("Error opening socket: host " + host + " port " + port);
+ }
+
+ readerThread = new Thread(new SocketSpout.SocketReaderRunnable());
+ readerThread.start();
+ }
+
+ @Override
+ public void close() {
+ running = false;
+ readerThread.interrupt();
+ queue.clear();
+
+ closeQuietly(in);
+ closeQuietly(socket);
+ }
+
+ @Override
+ public void activate() {
+ running = true;
+ }
+
+ @Override
+ public void deactivate() {
+ running = false;
+ }
+
+ @Override
+ public void nextTuple() {
+ if (queue.peek() != null) {
+ List<Object> values = queue.poll();
+ if (values != null) {
+ String id = UUID.randomUUID().toString();
+ emitted.put(id, values);
+ collector.emit(values, id);
+ }
+ }
+ }
+
+ private List<Object> convertLineToTuple(String line) {
+ return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ emitted.remove(msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ List<Object> emittedValues = emitted.remove(msgId);
+ if (emittedValues != null) {
+ queue.addLast(emittedValues);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(scheme.getOutputFields());
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ private class SocketReaderRunnable implements Runnable {
+ public void run() {
+ while (running) {
+ try {
+ String line = in.readLine();
+ if (line == null) {
+ throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
+ }
+
+ List<Object> values = convertLineToTuple(line.trim());
+ queue.push(values);
+ } catch (Throwable t) {
+ // This spout is added to test purpose, so just failing fast doesn't hurt much
+ die(t);
+ }
+ }
+ }
+ }
+
+ private void die(Throwable t) {
+ LOG.error("Halting process: TridentSocketSpout died.", t);
+ if (running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
+ System.exit(11);
+ }
+ }
+
+ private void closeQuietly(final Closeable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (final IOException ioe) {
+ // ignore
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
deleted file mode 100644
index 60c1a5e..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.Socket;
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-/**
- * Trident State implementation of Socket. It only supports writing.
- */
-public class SocketState implements State {
- /**
- * {@inheritDoc}
- */
- @Override
- public void beginCommit(Long txid) {
- // no-op
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void commit(Long txid) {
- // no-op
- }
-
- public static class Factory implements StateFactory {
- private final String host;
- private final int port;
-
- public Factory(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- @Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- BufferedWriter out;
- try {
- Socket socket = new Socket(host, port);
- out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
- } catch (IOException e) {
- throw new RuntimeException("Exception while initializing socket for State. host "
- + host + " port " + port, e);
- }
-
- // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
- return new SocketState(out);
- }
- }
-
- private BufferedWriter out;
-
- private SocketState(BufferedWriter out) {
- this.out = out;
- }
-
- public void write(String str) throws IOException {
- out.write(str);
- }
-
- public void flush() throws IOException {
- out.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
deleted file mode 100644
index df2893b..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
- */
-public class SocketStateUpdater extends BaseStateUpdater<SocketState> {
- private static final Logger LOG = LoggerFactory.getLogger(SocketStateUpdater.class);
-
- private final IOutputSerializer outputSerializer;
-
- public SocketStateUpdater(IOutputSerializer outputSerializer) {
- this.outputSerializer = outputSerializer;
- }
-
- @Override
- public void updateState(SocketState state, List<TridentTuple> tuples, TridentCollector collector) {
- try {
- for (TridentTuple tuple : tuples) {
- byte[] array = outputSerializer.write(tuple.getValues(), null).array();
- String data = new String(array);
- state.write(data + "\n");
- }
- state.flush();
- } catch (IOException e) {
- LOG.error("Error while updating state.", e);
- collector.reportError(e);
- throw new RuntimeException("Error while updating state.", e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
deleted file mode 100644
index aad42fb..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.storm.Config;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
- */
-public class TridentSocketSpout implements IBatchSpout {
- private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
-
- private final String host;
- private final int port;
- private final Scheme scheme;
-
- private volatile boolean running = true;
-
- private BlockingDeque<String> queue;
- private Socket socket;
- private Thread readerThread;
- private BufferedReader in;
- private ObjectMapper objectMapper;
-
- private Map<Long, List<List<Object>>> batches;
-
- /**
- * TridentSocketSpout Constructor.
- * @param scheme Scheme
- * @param host socket host
- * @param port socket port
- */
- public TridentSocketSpout(Scheme scheme, String host, int port) {
- this.scheme = scheme;
- this.host = host;
- this.port = port;
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- this.queue = new LinkedBlockingDeque<>();
- this.objectMapper = new ObjectMapper();
- this.batches = new HashMap<>();
-
- try {
- socket = new Socket(host, port);
- in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- } catch (IOException e) {
- throw new RuntimeException("Error opening socket: host " + host + " port " + port);
- }
-
- readerThread = new Thread(new SocketReaderRunnable());
- readerThread.start();
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- // read line and parse it to json
- List<List<Object>> batch = this.batches.get(batchId);
- if (batch == null) {
- batch = new ArrayList<>();
-
- while (queue.peek() != null) {
- String line = queue.poll();
- List<Object> values = convertLineToTuple(line);
- if (values == null) {
- continue;
- }
-
- batch.add(values);
- }
-
- this.batches.put(batchId, batch);
- }
-
- for (List<Object> list : batch) {
- collector.emit(list);
- }
- }
-
- private List<Object> convertLineToTuple(String line) {
- return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
- }
-
- @Override
- public void ack(long batchId) {
- this.batches.remove(batchId);
- }
-
- @Override
- public void close() {
- running = false;
- readerThread.interrupt();
- queue.clear();
-
- closeQuietly(in);
- closeQuietly(socket);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-
- private class SocketReaderRunnable implements Runnable {
- public void run() {
- while (running) {
- try {
- String line = in.readLine();
- if (line == null) {
- throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
- }
-
- queue.push(line.trim());
- } catch (Throwable t) {
- // This spout is added to test purpose, so just failing fast doesn't hurt much
- die(t);
- }
- }
- }
- }
-
- private void die(Throwable t) {
- LOG.error("Halting process: TridentSocketSpout died.", t);
- if (running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
- System.exit(11);
- }
- }
-
- private void closeQuietly(final Closeable closeable) {
- try {
- if (closeable != null) {
- closeable.close();
- }
- } catch (final IOException ioe) {
- // ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java
new file mode 100644
index 0000000..215e47f
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.streams.operations.FlatMapFunction;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvaluationCalc implements FlatMapFunction<Values, Values> {
+ private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
+
+ private final ExecutableExpression filterInstance;
+ private final ExecutableExpression projectionInstance;
+ private final Object[] outputValues;
+ private final DataContext dataContext;
+
+ /**
+ * EvaluationCalc Constructor.
+ * @param filterInstance ExecutableExpression
+ * @param projectionInstance ExecutableExpression
+ * @param outputCount output count
+ * @param dataContext DataContext
+ */
+ public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance,
+ int outputCount, DataContext dataContext) {
+ this.filterInstance = filterInstance;
+ this.projectionInstance = projectionInstance;
+ this.outputValues = new Object[outputCount];
+ this.dataContext = dataContext;
+
+ if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+ }
+ if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+ }
+ }
+
+ @Override
+ public Iterable<Values> apply(Values input) {
+ Context calciteContext = new StormContext(dataContext);
+ calciteContext.values = input.toArray();
+
+ if (filterInstance != null) {
+ filterInstance.execute(calciteContext, outputValues);
+ // filtered out
+ if (outputValues[0] == null || !((Boolean) outputValues[0])) {
+ return Collections.emptyList();
+ }
+ }
+
+ if (projectionInstance != null) {
+ projectionInstance.execute(calciteContext, outputValues);
+ return Collections.singletonList(new Values(outputValues));
+ } else {
+ return Collections.singletonList(new Values(input.toArray()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java
new file mode 100644
index 0000000..c3237fe
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.streams.operations.Predicate;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvaluationFilter implements Predicate<Values> {
+ private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
+
+ private final ExecutableExpression filterInstance;
+ private final DataContext dataContext;
+ private final Object[] outputValues;
+
+ /**
+ * EvaluationFilter Constructor.
+ * @param filterInstance ExecutableExpression
+ * @param dataContext DataContext
+ */
+ public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
+ this.filterInstance = filterInstance;
+ this.dataContext = dataContext;
+ this.outputValues = new Object[1];
+
+ if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+ }
+ }
+
+ @Override
+ public boolean test(Values input) {
+ Context calciteContext = new StormContext(dataContext);
+ calciteContext.values = input.toArray();
+ filterInstance.execute(calciteContext, outputValues);
+ return (outputValues[0] != null && (boolean) outputValues[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java
new file mode 100644
index 0000000..b10b4e8
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.streams.operations.Function;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvaluationFunction implements Function<Values, Values> {
+ private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
+
+ private final ExecutableExpression projectionInstance;
+ private final Object[] outputValues;
+ private final DataContext dataContext;
+
+ /**
+ * EvaluationFunction Constructor.
+ * @param projectionInstance ExecutableExpression
+ * @param outputCount output count
+ * @param dataContext DataContext
+ */
+ public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+ this.projectionInstance = projectionInstance;
+ this.outputValues = new Object[outputCount];
+ this.dataContext = dataContext;
+
+ if (projectionInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+ }
+ }
+
+ @Override
+ public Values apply(Values input) {
+ Context calciteContext = new StormContext(dataContext);
+ calciteContext.values = input.toArray();
+ projectionInstance.execute(calciteContext, outputValues);
+ return new Values(outputValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java
new file mode 100644
index 0000000..fd5035a
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairFunction;
+import org.apache.storm.tuple.Values;
+
+public class StreamInsertMapToPairFunction implements PairFunction<Values, Object, Values> {
+
+ private final int primaryKeyIndex;
+
+ public StreamInsertMapToPairFunction(int primaryKeyIndex) {
+ this.primaryKeyIndex = primaryKeyIndex;
+ }
+
+ @Override
+ public Pair<Object, Values> apply(Values input) {
+ return Pair.of(input.get(primaryKeyIndex), input);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java
new file mode 100644
index 0000000..db5c5da
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.streams.operations.mappers.TupleValueMapper;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+public class StreamsScanTupleValueMapper implements TupleValueMapper<Values> {
+ private final List<String> fieldNames;
+
+ /**
+ * Constructor.
+ *
+ * @param fieldNames fields to be added to.
+ */
+ public StreamsScanTupleValueMapper(List<String> fieldNames) {
+ // prevent issue when the implementation of fieldNames is not serializable
+ // getRowType().getFieldNames() returns Calcite Pair$ which is NOT serializable
+ this.fieldNames = new ArrayList<>(fieldNames);
+ }
+
+ @Override
+ public Values apply(Tuple input) {
+ Values values = new Values();
+ for (String field : fieldNames) {
+ values.add(input.getValueByField(field));
+ }
+ return values;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
deleted file mode 100644
index 1b2f250..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EvaluationCalc implements OperationAwareFlatMapFunction {
- private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
-
- private final ExecutableExpression filterInstance;
- private final ExecutableExpression projectionInstance;
- private final Object[] outputValues;
- private final DataContext dataContext;
-
- /**
- * EvaluationCalc Constructor.
- * @param filterInstance ExecutableExpression
- * @param projectionInstance ExecutableExpression
- * @param outputCount output count
- * @param dataContext DataContext
- */
- public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance,
- int outputCount, DataContext dataContext) {
- this.filterInstance = filterInstance;
- this.projectionInstance = projectionInstance;
- this.outputValues = new Object[outputCount];
- this.dataContext = dataContext;
- }
-
- @Override
- public void prepare(Map<String, Object> conf, TridentOperationContext context) {
- if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
- }
- if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
- }
- }
-
- @Override
- public void cleanup() {
-
- }
-
- @Override
- public Iterable<Values> execute(TridentTuple input) {
- Context calciteContext = new StormContext(dataContext);
- calciteContext.values = input.getValues().toArray();
-
- if (filterInstance != null) {
- filterInstance.execute(calciteContext, outputValues);
- // filtered out
- if (outputValues[0] == null || !((Boolean) outputValues[0])) {
- return Collections.emptyList();
- }
- }
-
- if (projectionInstance != null) {
- projectionInstance.execute(calciteContext, outputValues);
- return Collections.singletonList(new Values(outputValues));
- } else {
- return Collections.singletonList(new Values(input.getValues()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
deleted file mode 100644
index 83e187c..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import java.util.Map;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EvaluationFilter extends BaseFilter {
- private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
-
- private final ExecutableExpression filterInstance;
- private final DataContext dataContext;
- private final Object[] outputValues;
-
- /**
- * EvaluationFilter Constructor.
- * @param filterInstance ExecutableExpression
- * @param dataContext DataContext
- */
- public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
- this.filterInstance = filterInstance;
- this.dataContext = dataContext;
- this.outputValues = new Object[1];
- }
-
- @Override
- public void prepare(Map<String, Object> conf, TridentOperationContext context) {
- if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
- }
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- Context calciteContext = new StormContext(dataContext);
- calciteContext.values = tuple.getValues().toArray();
- filterInstance.execute(calciteContext, outputValues);
- return (outputValues[0] != null && (boolean) outputValues[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
deleted file mode 100644
index 340492d..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import java.util.Map;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EvaluationFunction implements OperationAwareMapFunction {
- private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
-
- private final ExecutableExpression projectionInstance;
- private final Object[] outputValues;
- private final DataContext dataContext;
-
- /**
- * EvaluationFunction Constructor.
- * @param projectionInstance ExecutableExpression
- * @param outputCount output count
- * @param dataContext DataContext
- */
- public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
- this.projectionInstance = projectionInstance;
- this.outputValues = new Object[outputCount];
- this.dataContext = dataContext;
- }
-
- @Override
- public void prepare(Map<String, Object> conf, TridentOperationContext context) {
- if (projectionInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
- }
- }
-
- @Override
- public void cleanup() {
-
- }
-
- @Override
- public Values execute(TridentTuple input) {
- Context calciteContext = new StormContext(dataContext);
- calciteContext.values = input.getValues().toArray();
- projectionInstance.execute(calciteContext, outputValues);
- return new Values(outputValues);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
deleted file mode 100644
index cb12ab3..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class ForwardFunction extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- collector.emit(tuple.getValues());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 4c945fa..f728bf6 100644
--- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -1,38 +1,23 @@
/*
- * *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * * <p>
- * * http://www.apache.org/licenses/LICENSE-2.0
- * * <p>
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
package org.apache.storm.sql;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -41,7 +26,36 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.junit.rules.ExternalResource;
+
public class TestUtils {
+ public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() {
+ @Override
+ protected void before() throws Throwable {
+ MockInsertBolt.getCollectedValues().clear();
+ }
+ };
+
+ public static final ExternalResource mockBoltValueResource = new ExternalResource() {
+ @Override
+ protected void before() throws Throwable {
+ MockBolt.getCollectedValues().clear();
+ }
+ };
+
public static class MyPlus {
public static Integer evaluate(Integer x, Integer y) {
return x + y;
@@ -85,428 +99,264 @@ public class TestUtils {
}
}
- public static class MockState implements State {
- /**
- * Collect all values in a static variable as the instance will go through serialization and deserialization.
- * NOTE: This should be cleared before or after running each test.
- */
- private transient static final List<List<Object> > VALUES = new ArrayList<>();
+ public static class MockSpout extends BaseRichSpout {
- public static List<List<Object>> getCollectedValues() {
- return VALUES;
+ private final List<Values> records;
+ private final Fields outputFields;
+ private boolean emitted = false;
+ private SpoutOutputCollector collector;
+
+ public MockSpout(List<Values> records, Fields outputFields) {
+ this.records = records;
+ this.outputFields = outputFields;
}
@Override
- public void beginCommit(Long txid) {
- // NOOP
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
}
@Override
- public void commit(Long txid) {
- // NOOP
- }
+ public void nextTuple() {
+ if (emitted) {
+ return;
+ }
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- for (TridentTuple tuple : tuples) {
- VALUES.add(tuple.getValues());
+ for (Values r : records) {
+ collector.emit(r);
}
- }
- }
- public static class MockStateFactory implements StateFactory {
+ emitted = true;
+ }
@Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- return new MockState();
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(outputFields);
}
}
- public static class MockStateUpdater implements StateUpdater<MockState> {
+ public static class MockBolt extends BaseRichBolt {
+ /**
+ * Collect all values in a static variable as the instance will go through serialization and deserialization.
+ * NOTE: This should be cleared before or after running each test.
+ */
+ private transient static final List<Values> VALUES = new ArrayList<>();
- @Override
- public void updateState(MockState state, List<TridentTuple> tuples, TridentCollector collector) {
- state.updateState(tuples, collector);
+ public static List<Values> getCollectedValues() {
+ return VALUES;
}
@Override
- public void prepare(Map<String, Object> conf, TridentOperationContext context) {
- // NOOP
- }
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- @Override
- public void cleanup() {
- // NOOP
}
- }
- public static class MockSqlExprDataSource implements ISqlTridentDataSource {
@Override
- public IBatchSpout getProducer() {
- return new MockSqlExprDataSource.MockSpout();
+ public void execute(Tuple input) {
+ VALUES.add((Values) input.getValue(0));
}
@Override
- public SqlTridentConsumer getConsumer() {
- return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
- }
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
- private static class MockSpout implements IBatchSpout {
- private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
-
- public MockSpout() {
- for (int i = 0; i < 5; ++i) {
- RECORDS.add(new Values(i, "x", null));
- }
- }
-
- private boolean emitted = false;
+ }
+ }
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
+ public static class MockInsertBolt extends BaseRichBolt {
+ /**
+ * Collect all values in a static variable as the instance will go through serialization and deserialization.
+ * NOTE: This should be cleared before or after running each test.
+ */
+ private transient static final List<Pair<Object, Values>> VALUES = new ArrayList<>();
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- if (emitted) {
- return;
- }
+ public static List<Pair<Object, Values>> getCollectedValues() {
+ return VALUES;
+ }
- for (Values r : RECORDS) {
- collector.emit(r);
- }
- emitted = true;
- }
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- @Override
- public void ack(long batchId) {
- }
+ }
- @Override
- public void close() {
- }
+ @Override
+ public void execute(Tuple input) {
+ VALUES.add(Pair.of(input.getValue(0), (Values) input.getValue(1)));
+ }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
- @Override
- public Fields getOutputFields() {
- return OUTPUT_FIELDS;
- }
}
}
- public static class MockSqlTridentDataSource implements ISqlTridentDataSource {
+ public static class MockSqlExprDataSource implements ISqlStreamsDataSource {
@Override
- public IBatchSpout getProducer() {
- return new MockSpout();
+ public IRichSpout getProducer() {
+ throw new UnsupportedOperationException("Not supported.");
}
@Override
- public SqlTridentConsumer getConsumer() {
- return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
- }
-
- private static class MockSpout implements IBatchSpout {
- private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
-
- public MockSpout() {
- RECORDS.add(new Values(0, "a", "y"));
- RECORDS.add(new Values(1, "ab", "y"));
- RECORDS.add(new Values(2, "abc", "y"));
- RECORDS.add(new Values(3, "abcd", "y"));
- RECORDS.add(new Values(4, "abcde", "y"));
- }
-
- private boolean emitted = false;
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- if (emitted) {
- return;
- }
-
- for (Values r : RECORDS) {
- collector.emit(r);
- }
- emitted = true;
- }
-
- @Override
- public void ack(long batchId) {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public Fields getOutputFields() {
- return OUTPUT_FIELDS;
- }
+ public IRichBolt getConsumer() {
+ return new MockBolt();
}
}
- public static class MockSqlTridentGroupedDataSource implements ISqlTridentDataSource {
+ public static class MockSqlStreamsOutputDataSource implements ISqlStreamsDataSource {
+
@Override
- public IBatchSpout getProducer() {
- return new MockGroupedSpout();
+ public IRichSpout getProducer() {
+ throw new UnsupportedOperationException("Not supported.");
}
@Override
- public SqlTridentConsumer getConsumer() {
- return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
+ public IRichBolt getConsumer() {
+ return new MockInsertBolt();
}
- private static class MockGroupedSpout implements IBatchSpout {
- private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("ID", "GRPID", "NAME", "ADDR", "AGE", "SCORE");
-
- public MockGroupedSpout() {
- for (int i = 0; i < 5; ++i) {
- RECORDS.add(new Values(i, 0, "x", "y", 5 - i, i * 10));
- }
- }
-
- private boolean emitted = false;
+ }
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
+ public static class MockSqlStreamsDataSource implements ISqlStreamsDataSource {
+ @Override
+ public IRichSpout getProducer() {
+ List<Values> records = new ArrayList<>();
+ records.add(new Values(0, "a", "y"));
+ records.add(new Values(1, "ab", "y"));
+ records.add(new Values(2, "abc", "y"));
+ records.add(new Values(3, "abcd", "y"));
+ records.add(new Values(4, "abcde", "y"));
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- if (emitted) {
- return;
- }
+ Fields outputFields = new Fields("ID", "NAME", "ADDR");
+ return new MockSpout(records, outputFields);
+ }
- for (Values r : RECORDS) {
- collector.emit(r);
- }
- emitted = true;
- }
+ @Override
+ public IRichBolt getConsumer() {
+ return new MockBolt();
+ }
- @Override
- public void ack(long batchId) {
- }
+ }
- @Override
- public void close() {
- }
+ public static class MockSqlStreamsInsertDataSource extends MockSqlStreamsNestedDataSource {
+ @Override
+ public IRichBolt getConsumer() {
+ return new MockInsertBolt();
+ }
+ }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
+ public static class MockSqlStreamsGroupedDataSource implements ISqlStreamsDataSource {
+ @Override
+ public IRichSpout getProducer() {
+ List<Values> records = new ArrayList<>();
+ for (int i = 0; i < 5; ++i) {
+ records.add(new Values(i, 0, "x", "y", 5 - i, i * 10));
}
- @Override
- public Fields getOutputFields() {
- return OUTPUT_FIELDS;
- }
+ Fields outputFields = new Fields("ID", "GRPID", "NAME", "ADDR", "AGE", "SCORE");
+ return new MockSpout(records, outputFields);
}
- }
- public static class MockSqlTridentJoinDataSourceEmp implements ISqlTridentDataSource {
@Override
- public IBatchSpout getProducer() {
- return new MockSpout();
+ public IRichBolt getConsumer() {
+ return new MockBolt();
}
+ }
+ public static class MockSqlStreamsInsertGroupedDataSource extends MockSqlStreamsGroupedDataSource {
@Override
- public SqlTridentConsumer getConsumer() {
- return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
+ public IRichBolt getConsumer() {
+ return new MockInsertBolt();
}
+ }
- private static class MockSpout implements IBatchSpout {
- private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("EMPID", "EMPNAME", "DEPTID");
-
- public MockSpout() {
- for (int i = 0; i < 5; ++i) {
- RECORDS.add(new Values(i, "emp-" + i, i % 2));
- }
- for (int i = 10; i < 15; ++i) {
- RECORDS.add(new Values(i, "emp-" + i, i));
- }
- }
-
- private boolean emitted = false;
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- if (emitted) {
- return;
- }
-
- for (Values r : RECORDS) {
- collector.emit(r);
- }
- emitted = true;
- }
-
- @Override
- public void ack(long batchId) {
- }
+ public static class MockSqlStreamsJoinDataSourceEmp implements ISqlStreamsDataSource {
+ @Override
+ public IRichSpout getProducer() {
+ List<Values> records = new ArrayList<>();
+ Fields outputFields = new Fields("EMPID", "EMPNAME", "DEPTID");
- @Override
- public void close() {
+ for (int i = 0; i < 5; ++i) {
+ records.add(new Values(i, "emp-" + i, i % 2));
}
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
+ for (int i = 10; i < 15; ++i) {
+ records.add(new Values(i, "emp-" + i, i));
}
- @Override
- public Fields getOutputFields() {
- return OUTPUT_FIELDS;
- }
+ return new MockSpout(records, outputFields);
}
- }
- public static class MockSqlTridentJoinDataSourceDept implements ISqlTridentDataSource {
@Override
- public IBatchSpout getProducer() {
- return new MockSpout();
+ public IRichBolt getConsumer() {
+ return new MockBolt();
}
+ }
+ public static class MockSqlStreamsInsertJoinDataSourceEmp extends MockSqlStreamsJoinDataSourceEmp {
@Override
- public SqlTridentConsumer getConsumer() {
- return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
+ public IRichBolt getConsumer() {
+ return new MockInsertBolt();
}
+ }
- private static class MockSpout implements IBatchSpout {
- private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("DEPTID", "DEPTNAME");
-
- public MockSpout() {
- for (int i = 0; i < 5; ++i) {
- RECORDS.add(new Values(i, "dept-" + i));
- }
- }
-
- private boolean emitted = false;
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- if (emitted) {
- return;
- }
-
- for (Values r : RECORDS) {
- collector.emit(r);
- }
- emitted = true;
- }
-
- @Override
- public void ack(long batchId) {
- }
+ public static class MockSqlStreamsJoinDataSourceDept implements ISqlStreamsDataSource {
+ @Override
+ public IRichSpout getProducer() {
+ List<Values> records = new ArrayList<>();
+ Fields outputFields = new Fields("DEPTID", "DEPTNAME");
- @Override
- public void close() {
+ for (int i = 0; i < 5; ++i) {
+ records.add(new Values(i, "dept-" + i));
}
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
+ return new MockSpout(records, outputFields);
+ }
- @Override
- public Fields getOutputFields() {
- return OUTPUT_FIELDS;
- }
+ @Override
+ public IRichBolt getConsumer() {
+ return new MockBolt();
}
}
- public static class MockSqlTridentNestedDataSource implements ISqlTridentDataSource {
+ public static class MockSqlStreamsInsertJoinDataSourceDept extends MockSqlStreamsJoinDataSourceDept {
@Override
- public IBatchSpout getProducer() {
- return new MockSpout();
+ public IRichBolt getConsumer() {
+ return new MockInsertBolt();
}
+ }
+ public static class MockSqlStreamsNestedDataSource implements ISqlStreamsDataSource {
@Override
- public SqlTridentConsumer getConsumer() {
- return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
- }
-
- private static class MockSpout implements IBatchSpout {
- private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("ID", "MAPFIELD", "NESTEDMAPFIELD", "ARRAYFIELD");
-
- public MockSpout() {
- List<Integer> ints = Arrays.asList(100, 200, 300);
- for (int i = 0; i < 5; ++i) {
- Map<String, Integer> map = new HashMap<>();
- map.put("b", i);
- map.put("c", i*i);
- Map<String, Map<String, Integer>> mm = new HashMap<>();
- mm.put("a", map);
- RECORDS.add(new Values(i, map, mm, ints));
- }
- }
-
- private boolean emitted = false;
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context) {
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- if (emitted) {
- return;
- }
-
- for (Values r : RECORDS) {
- collector.emit(r);
- }
- emitted = true;
- }
-
- @Override
- public void ack(long batchId) {
- }
+ public IRichSpout getProducer() {
+ List<Values> records = new ArrayList<>();
+ Fields outputFields = new Fields("ID", "MAPFIELD", "NESTEDMAPFIELD", "ARRAYFIELD");
- @Override
- public void close() {
+ List<Integer> ints = Arrays.asList(100, 200, 300);
+ for (int i = 0; i < 5; ++i) {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("b", i);
+ map.put("c", i*i);
+ Map<String, Map<String, Integer>> mm = new HashMap<>();
+ mm.put("a", map);
+ records.add(new Values(i, map, mm, ints));
}
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
+ return new MockSpout(records, outputFields);
+ }
- @Override
- public Fields getOutputFields() {
- return OUTPUT_FIELDS;
- }
+ @Override
+ public IRichBolt getConsumer() {
+ return new MockBolt();
}
}
+ public static class MockSqlStreamsInsertNestedDataSource extends MockSqlStreamsNestedDataSource {
+ @Override
+ public IRichBolt getConsumer() {
+ return new MockInsertBolt();
+ }
+ }
public static long monotonicNow() {
final long NANOSECONDS_PER_MILLISECOND = 1000000;
return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
}
-}
+}
\ No newline at end of file