You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:15 UTC
[12/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
deleted file mode 100644
index a373483..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-public class StormSqlFunctions {
- public static Boolean eq(Object b0, Object b1) {
- if (b0 == null || b1 == null) {
- return null;
- }
- return b0.equals(b1);
- }
-
- public static Boolean ne(Object b0, Object b1) {
- if (b0 == null || b1 == null) {
- return null;
- }
- return !b0.equals(b1);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
deleted file mode 100644
index e78f354..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.calcite;
-
-import org.apache.calcite.interpreter.Context;
-
-public class DebuggableExecutableExpression implements ExecutableExpression {
- private ExecutableExpression delegate;
- private String delegateCode;
-
- public DebuggableExecutableExpression(ExecutableExpression delegate, String delegateCode) {
- this.delegate = delegate;
- this.delegateCode = delegateCode;
- }
-
- @Override
- public Object execute(Context context) {
- return delegate.execute(context);
- }
-
- @Override
- public void execute(Context context, Object[] results) {
- delegate.execute(context, results);
- }
-
- public String getDelegateCode() {
- return delegateCode;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
deleted file mode 100644
index 8416945..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.calcite;
-
-import org.apache.calcite.interpreter.Context;
-
-import java.io.Serializable;
-
-/**
- * Compiled executable expression.
- */
-public interface ExecutableExpression extends Serializable {
- Object execute(Context context);
- void execute(Context context, Object[] results);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
deleted file mode 100644
index 4861b43..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.calcite;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.runtime.Hook;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.util.Holder;
-
-import java.io.Serializable;
-import java.util.Calendar;
-import java.util.TimeZone;
-
-/**
- * This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite.
- */
-public class StormDataContext implements DataContext, Serializable {
- private final ImmutableMap<Object, Object> map;
-
- public StormDataContext() {
- // Store the time at which the query started executing. The SQL
- // standard says that functions such as CURRENT_TIMESTAMP return the
- // same value throughout the query.
-
- final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis());
-
- // Give a hook chance to alter the clock.
- Hook.CURRENT_TIME.run(timeHolder);
- final long time = timeHolder.get();
- final TimeZone timeZone = Calendar.getInstance().getTimeZone();
- final long localOffset = timeZone.getOffset(time);
- final long currentOffset = localOffset;
-
- ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
- builder.put(Variable.UTC_TIMESTAMP.camelName, time)
- .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset)
- .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
- .put(Variable.TIME_ZONE.camelName, timeZone);
- map = builder.build();
- }
-
- @Override
- public SchemaPlus getRootSchema() {
- return null;
- }
-
- @Override
- public JavaTypeFactory getTypeFactory() {
- return null;
- }
-
- @Override
- public QueryProvider getQueryProvider() {
- return null;
- }
-
- @Override
- public Object get(String name) {
- return map.get(name);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
deleted file mode 100644
index d81e772..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket;
-
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater;
-import org.apache.storm.sql.runtime.datasource.socket.trident.TridentSocketSpout;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a Socket data source based on the URI and properties. The URI has the format of
- * socket://[host]:[port]. Both of host and port are mandatory.
- *
- * Note that it connects to given host and port, and receive the message if it's used for input source,
- * and send the message if it's used for output data source.
- */
-public class SocketDataSourcesProvider implements DataSourcesProvider {
- @Override
- public String scheme() {
- return "socket";
- }
-
- private static class SocketTridentDataSource implements ISqlTridentDataSource {
-
- private final String host;
- private final int port;
- private final Scheme scheme;
- private final IOutputSerializer serializer;
-
- SocketTridentDataSource(Scheme scheme, IOutputSerializer serializer, String host, int port) {
- this.scheme = scheme;
- this.serializer = serializer;
- this.host = host;
- this.port = port;
- }
-
- @Override
- public ITridentDataSource getProducer() {
- return new TridentSocketSpout(scheme, host, port);
- }
-
- @Override
- public SqlTridentConsumer getConsumer() {
- StateFactory stateFactory = new SocketState.Factory(host, port);
- StateUpdater<SocketState> stateUpdater = new SocketStateUpdater(serializer);
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
- }
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
- String host = uri.getHost();
- int port = uri.getPort();
- if (port == -1) {
- throw new RuntimeException("Port information is not available. URI: " + uri);
- }
-
- List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
- Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
- IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
-
- return new SocketTridentDataSource(scheme, serializer, host, port);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
deleted file mode 100644
index 3f85756..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.Socket;
-import java.util.Map;
-
-/**
- * Trident State implementation of Socket. It only supports writing.
- */
-public class SocketState implements State {
- /**
- * {@inheritDoc}
- */
- @Override
- public void beginCommit(Long txid) {
- // no-op
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void commit(Long txid) {
- // no-op
- }
-
- public static class Factory implements StateFactory {
- private final String host;
- private final int port;
-
- public Factory(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- BufferedWriter out;
- try {
- Socket socket = new Socket(host, port);
- out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
- } catch (IOException e) {
- throw new RuntimeException("Exception while initializing socket for State. host " +
- host + " port " + port, e);
- }
-
- // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
- return new SocketState(out);
- }
- }
-
- private BufferedWriter out;
-
- private SocketState(BufferedWriter out) {
- this.out = out;
- }
-
- public void write(String str) throws IOException {
- out.write(str);
- }
-
- public void flush() throws IOException {
- out.flush();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
deleted file mode 100644
index 3062a90..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
- */
-public class SocketStateUpdater extends BaseStateUpdater<SocketState> {
- private static final Logger LOG = LoggerFactory.getLogger(SocketStateUpdater.class);
-
- private final IOutputSerializer outputSerializer;
-
- public SocketStateUpdater(IOutputSerializer outputSerializer) {
- this.outputSerializer = outputSerializer;
- }
-
- @Override
- public void updateState(SocketState state, List<TridentTuple> tuples, TridentCollector collector) {
- try {
- for (TridentTuple tuple : tuples) {
- byte[] array = outputSerializer.write(tuple.getValues(), null).array();
- String data = new String(array);
- state.write(data + "\n");
- }
- state.flush();
- } catch (IOException e) {
- LOG.error("Error while updating state.", e);
- collector.reportError(e);
- throw new RuntimeException("Error while updating state.", e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
deleted file mode 100644
index 97f63a7..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.storm.Config;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-
-/**
- * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
- */
-public class TridentSocketSpout implements IBatchSpout {
- private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
-
- private final String host;
- private final int port;
- private final Scheme scheme;
-
- private volatile boolean _running = true;
-
- private BlockingDeque<String> queue;
- private Socket socket;
- private Thread readerThread;
- private BufferedReader in;
- private ObjectMapper objectMapper;
-
- private Map<Long, List<List<Object>>> batches;
-
- public TridentSocketSpout(Scheme scheme, String host, int port) {
- this.scheme = scheme;
- this.host = host;
- this.port = port;
- }
-
- @Override
- public void open(Map conf, TopologyContext context) {
- this.queue = new LinkedBlockingDeque<>();
- this.objectMapper = new ObjectMapper();
- this.batches = new HashMap<>();
-
- try {
- socket = new Socket(host, port);
- in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- } catch (IOException e) {
- throw new RuntimeException("Error opening socket: host " + host + " port " + port);
- }
-
- readerThread = new Thread(new SocketReaderRunnable());
- readerThread.start();
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- // read line and parse it to json
- List<List<Object>> batch = this.batches.get(batchId);
- if (batch == null) {
- batch = new ArrayList<>();
-
- while(queue.peek() != null) {
- String line = queue.poll();
- List<Object> values = convertLineToTuple(line);
- if (values == null) {
- continue;
- }
-
- batch.add(values);
- }
-
- this.batches.put(batchId, batch);
- }
-
- for (List<Object> list : batch) {
- collector.emit(list);
- }
- }
-
- private List<Object> convertLineToTuple(String line) {
- return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
- }
-
- @Override
- public void ack(long batchId) {
- this.batches.remove(batchId);
- }
-
- @Override
- public void close() {
- _running = false;
- readerThread.interrupt();
- queue.clear();
-
- closeQuietly(in);
- closeQuietly(socket);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-
- private class SocketReaderRunnable implements Runnable {
- public void run() {
- while (_running) {
- try {
- String line = in.readLine();
- if (line == null) {
- throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
- }
-
- queue.push(line.trim());
- } catch (Throwable t) {
- // This spout is added to test purpose, so just failing fast doesn't hurt much
- die(t);
- }
- }
- }
- }
-
- private void die(Throwable t) {
- LOG.error("Halting process: TridentSocketSpout died.", t);
- if (_running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
- System.exit(11);
- }
- }
-
- private void closeQuietly(final Closeable closeable) {
- try {
- if (closeable != null) {
- closeable.close();
- }
- } catch (final IOException ioe) {
- // ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
deleted file mode 100644
index 3bf1a23..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * AvroScheme uses generic(without code generation) instead of specific(with code generation) readers.
- */
-public class AvroScheme implements Scheme {
- private final String schemaString;
- private final List<String> fieldNames;
- private final CachedSchemas schemas;
-
- public AvroScheme(String schemaString, List<String> fieldNames) {
- this.schemaString = schemaString;
- this.fieldNames = fieldNames;
- this.schemas = new CachedSchemas();
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- try {
- Schema schema = schemas.getSchema(schemaString);
- DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
- GenericRecord record = reader.read(null, decoder);
-
- ArrayList<Object> list = new ArrayList<>(fieldNames.size());
- for (String field : fieldNames) {
- Object value = record.get(field);
- // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
- list.add(SerdeUtils.convertAvroUtf8(value));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(fieldNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
deleted file mode 100644
index 5dc3393..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.avro;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * AvroSerializer uses generic(without code generation) instead of specific(with code generation) writers.
- */
-public class AvroSerializer implements IOutputSerializer, Serializable {
- private final String schemaString;
- private final List<String> fieldNames;
- private final CachedSchemas schemas;
-
- public AvroSerializer(String schemaString, List<String> fieldNames) {
- this.schemaString = schemaString;
- this.fieldNames = fieldNames;
- this.schemas = new CachedSchemas();
- }
-
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
- try {
- Schema schema = schemas.getSchema(schemaString);
- GenericRecord record = new GenericData.Record(schema);
- for (int i = 0; i < fieldNames.size(); i++) {
- record.put(fieldNames.get(i), data.get(i));
- }
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
- Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
- writer.write(record, encoder);
- encoder.flush();
- byte[] bytes = out.toByteArray();
- out.close();
- return ByteBuffer.wrap(bytes);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
deleted file mode 100644
index 4f0e747..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.avro;
-
-import org.apache.avro.Schema;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-// TODO this class is reserved for supporting messages with different schemas.
-// current only one schema in the cache
-public class CachedSchemas implements Serializable{
-
- private final Map<String, Schema> cache = new HashMap<>();
-
- public Schema getSchema(String schemaString) {
- Schema schema = cache.get(schemaString);
- if (schema == null) {
- schema = new Schema.Parser().parse(schemaString);
- cache.put(schemaString, schema);
- }
- return schema;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
deleted file mode 100644
index 34fb1bb..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.csv;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * CsvScheme uses the standard RFC4180 CSV Parser
- * One of the difference from Tsv format is that fields with embedded commas will be quoted.
- * eg: a,"b,c",d is allowed.
- *
- * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
- */
-public class CsvScheme implements Scheme {
- private final List<String> fieldNames;
-
- public CsvScheme(List<String> fieldNames) {
- this.fieldNames = fieldNames;
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- try {
- String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
- CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
- CSVRecord record = parser.getRecords().get(0);
- Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
-
- ArrayList<Object> list = new ArrayList<>(fieldNames.size());
- for (int i = 0; i < record.size(); i++) {
- list.add(record.get(i));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(fieldNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
deleted file mode 100644
index 0d3bd74..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.csv;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-/**
- * CsvSerializer uses the standard RFC4180 CSV Parser
- * One of the difference from Tsv format is that fields with embedded commas will be quoted.
- * eg: a,"b,c",d is allowed.
- *
- * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
- */
-public class CsvSerializer implements IOutputSerializer, Serializable {
- private final List<String> fields; //reserved for future
-
- public CsvSerializer(List<String> fields) {
- this.fields = fields;
- }
-
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- try {
- StringWriter writer = new StringWriter();
- CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
- for (Object o : data) {
- printer.print(o);
- }
- //since using StringWriter, we do not need to close it.
- return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
deleted file mode 100644
index d288fa1..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.json;
-
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class JsonScheme implements Scheme {
- private final List<String> fields;
-
- public JsonScheme(List<String> fields) {
- this.fields = fields;
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- @SuppressWarnings("unchecked")
- HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
- ArrayList<Object> list = new ArrayList<>(fields.size());
- for (String f : fields) {
- list.add(map.get(f));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(fields);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
deleted file mode 100644
index 1e825c4..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.json;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-public class JsonSerializer implements IOutputSerializer, Serializable {
- private final List<String> fieldNames;
- private final JsonFactory jsonFactory;
-
- public JsonSerializer(List<String> fieldNames) {
- this.fieldNames = fieldNames;
- jsonFactory = new JsonFactory();
- }
-
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
- StringWriter sw = new StringWriter();
- try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
- jg.writeStartObject();
- for (int i = 0; i < fieldNames.size(); ++i) {
- jg.writeFieldName(fieldNames.get(i));
- jg.writeObject(data.get(i));
- }
- jg.writeEndObject();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
deleted file mode 100644
index 310494c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.tsv;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TsvScheme uses a simple delimited format implemention by splitting string,
- * and it supports user defined delimiter.
- */
-public class TsvScheme implements Scheme {
- private final List<String> fieldNames;
- private final char delimiter;
-
- public TsvScheme(List<String> fieldNames, char delimiter) {
- this.fieldNames = fieldNames;
- this.delimiter = delimiter;
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
- List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
- Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
-
- ArrayList<Object> list = new ArrayList<>(fieldNames.size());
- list.addAll(parts);
- return list;
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(fieldNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
deleted file mode 100644
index 1cf1c76..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.tsv;
-
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-/**
- * TsvSerializer uses a simple delimited format implemention by splitting string,
- * and it supports user defined delimiter.
- */
-public class TsvSerializer implements IOutputSerializer, Serializable {
- private final List<String> fields; //reserved for future
- private final char delimiter;
-
- public TsvSerializer(List<String> fields, char delimiter) {
- this.fields = fields;
- this.delimiter = delimiter;
- }
-
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
- for (int i = 0; i < data.size(); i++) {
- Object o = data.get(i);
- if (i == 0) {
- sb.append(o);
- } else {
- sb.append(delimiter);
- sb.append(o);
- }
- }
- return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
deleted file mode 100644
index 6c76481..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Map;
-
-public class EvaluationCalc implements OperationAwareFlatMapFunction {
- private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
-
- private final ExecutableExpression filterInstance;
- private final ExecutableExpression projectionInstance;
- private final Object[] outputValues;
- private final DataContext dataContext;
-
- public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
- this.filterInstance = filterInstance;
- this.projectionInstance = projectionInstance;
- this.outputValues = new Object[outputCount];
- this.dataContext = dataContext;
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
- }
- if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
- }
- }
-
- @Override
- public void cleanup() {
-
- }
-
- @Override
- public Iterable<Values> execute(TridentTuple input) {
- Context calciteContext = new StormContext(dataContext);
- calciteContext.values = input.getValues().toArray();
-
- if (filterInstance != null) {
- filterInstance.execute(calciteContext, outputValues);
- // filtered out
- if (outputValues[0] == null || !((Boolean) outputValues[0])) {
- return Collections.emptyList();
- }
- }
-
- if (projectionInstance != null) {
- projectionInstance.execute(calciteContext, outputValues);
- return Collections.singletonList(new Values(outputValues));
- } else {
- return Collections.singletonList(new Values(input.getValues()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
deleted file mode 100644
index 9314852..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class EvaluationFilter extends BaseFilter {
- private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
-
- private final ExecutableExpression filterInstance;
- private final DataContext dataContext;
- private final Object[] outputValues;
-
- public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
- this.filterInstance = filterInstance;
- this.dataContext = dataContext;
- this.outputValues = new Object[1];
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
- }
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- Context calciteContext = new StormContext(dataContext);
- calciteContext.values = tuple.getValues().toArray();
- filterInstance.execute(calciteContext, outputValues);
- return (outputValues[0] != null && (boolean) outputValues[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
deleted file mode 100644
index 2608104..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class EvaluationFunction implements OperationAwareMapFunction {
- private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
-
- private final ExecutableExpression projectionInstance;
- private final Object[] outputValues;
- private final DataContext dataContext;
-
- public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
- this.projectionInstance = projectionInstance;
- this.outputValues = new Object[outputCount];
- this.dataContext = dataContext;
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- if (projectionInstance instanceof DebuggableExecutableExpression) {
- LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
- }
- }
-
- @Override
- public void cleanup() {
-
- }
-
- @Override
- public Values execute(TridentTuple input) {
- Context calciteContext = new StormContext(dataContext);
- calciteContext.values = input.getValues().toArray();
- projectionInstance.execute(calciteContext, outputValues);
- return new Values(outputValues);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
deleted file mode 100644
index 4c3a266..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class ForwardFunction extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- collector.emit(tuple.getValues());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
deleted file mode 100644
index efd5d25..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.utils;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.FieldInfo;
-
-import java.io.Serializable;
-import java.util.List;
-
-public final class FieldInfoUtils {
-
- public static List<String> getFieldNames(List<FieldInfo> fields) {
- return Lists.transform(fields, new FieldNameExtractor());
- }
-
- private static class FieldNameExtractor implements Function<FieldInfo, String>, Serializable {
- @Override
- public String apply(FieldInfo fieldInfo) {
- return fieldInfo.name();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
deleted file mode 100644
index 6b3dfc9..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.utils;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.util.Utf8;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.serde.avro.AvroScheme;
-import org.apache.storm.sql.runtime.serde.avro.AvroSerializer;
-import org.apache.storm.sql.runtime.serde.csv.CsvScheme;
-import org.apache.storm.sql.runtime.serde.csv.CsvSerializer;
-import org.apache.storm.sql.runtime.serde.json.JsonScheme;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.sql.runtime.serde.tsv.TsvScheme;
-import org.apache.storm.sql.runtime.serde.tsv.TsvSerializer;
-import org.apache.storm.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public final class SerdeUtils {
- public static Scheme getScheme(String inputFormatClass, Properties properties, List<String> fieldNames) {
- Scheme scheme;
- if (isNotEmpty(inputFormatClass)) {
- if (JsonScheme.class.getName().equals(inputFormatClass)) {
- scheme = new JsonScheme(fieldNames);
- } else if (TsvScheme.class.getName().equals(inputFormatClass)) {
- String delimiter = properties.getProperty("input.tsv.delimiter", "\t");
- scheme = new TsvScheme(fieldNames, delimiter.charAt(0));
- } else if (CsvScheme.class.getName().equals(inputFormatClass)) {
- scheme = new CsvScheme(fieldNames);
- } else if (AvroScheme.class.getName().equals(inputFormatClass)) {
- String schemaString = properties.getProperty("input.avro.schema");
- Preconditions.checkArgument(isNotEmpty(schemaString), "input.avro.schema can not be empty");
- scheme = new AvroScheme(schemaString, fieldNames);
- } else {
- scheme = Utils.newInstance(inputFormatClass);
- }
- } else {
- //use JsonScheme as the default scheme
- scheme = new JsonScheme(fieldNames);
- }
- return scheme;
- }
-
- public static IOutputSerializer getSerializer(String outputFormatClass, Properties properties, List<String> fieldNames) {
- IOutputSerializer serializer;
- if (isNotEmpty(outputFormatClass)) {
- if (JsonSerializer.class.getName().equals(outputFormatClass)) {
- serializer = new JsonSerializer(fieldNames);
- } else if (TsvSerializer.class.getName().equals(outputFormatClass)) {
- String delimiter = properties.getProperty("output.tsv.delimiter", "\t");
- serializer = new TsvSerializer(fieldNames, delimiter.charAt(0));
- } else if (CsvSerializer.class.getName().equals(outputFormatClass)) {
- serializer = new CsvSerializer(fieldNames);
- } else if (AvroSerializer.class.getName().equals(outputFormatClass)) {
- String schemaString = properties.getProperty("output.avro.schema");
- Preconditions.checkArgument(isNotEmpty(schemaString), "output.avro.schema can not be empty");
- serializer = new AvroSerializer(schemaString, fieldNames);
- } else {
- serializer = Utils.newInstance(outputFormatClass);
- }
- } else {
- //use JsonSerializer as the default serializer
- serializer = new JsonSerializer(fieldNames);
- }
- return serializer;
- }
-
- public static Object convertAvroUtf8(Object value){
- Object ret;
- if (value instanceof Utf8) {
- ret = value.toString();
- } else if (value instanceof Map<?, ?>) {
- ret = convertAvroUtf8Map((Map<Object,Object>)value);
- } else if (value instanceof GenericData.Array) {
- ret = convertAvroUtf8Array((GenericData.Array)value);
- } else {
- ret = value;
- }
- return ret;
- }
-
- public static Object convertAvroUtf8Map(Map<Object,Object> value) {
- Map<Object, Object> map = new HashMap<>(value.size());
- for (Map.Entry<Object, Object> entry : value.entrySet()) {
- Object k = convertAvroUtf8(entry.getKey());
- Object v = convertAvroUtf8(entry.getValue());
- map.put(k, v);
- }
- return map;
- }
-
- public static Object convertAvroUtf8Array(GenericData.Array value){
- List<Object> ls = new ArrayList<>(value.size());
- for(Object o : value){
- ls.add(convertAvroUtf8(o));
- }
- return ls;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
deleted file mode 100644
index a0f3af3..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.utils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public final class Utils {
-
- /**
- * This method for splitting string into parts by a delimiter
- * It has higher performance than String.split(String regex)
- *
- * @param data
- * @param delimiter
- * @return
- */
- public static List<String> split(String data, char delimiter){
- List<String> list = new LinkedList<>();
- //do not use .toCharArray avoid system copy
- StringBuilder sb = new StringBuilder(512);
- int len = data.length();
- for (int i=0; i < len; i++) {
- char ch = data.charAt(i);
- if(ch == delimiter){
- list.add(sb.toString());
- sb.setLength(0);
- if(i == len - 1){
- list.add("");
- }
- }else{
- sb.append(ch);
- }
- }
- if (sb.length() > 0) {
- list.add(sb.toString());
- }
- return list;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 9a945f7..0000000
--- a/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.runtime.datasource.socket.SocketDataSourcesProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java
deleted file mode 100644
index 174bfde..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.storm.sql.runtime.serde.avro.AvroScheme;
-import org.apache.storm.sql.runtime.serde.avro.AvroSerializer;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class TestAvroSerializer {
- private static final String schemaString = "{\"type\":\"record\"," +
- "\"name\":\"avrotest\"," +
- "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"}," +
- "{ \"name\":\"val\", \"type\":\"string\" }]}";
-
- private static final String schemaString1 = "{\"type\":\"record\"," +
- "\"name\":\"avrotest\"," +
- "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"}," +
- "{ \"type\":{\"type\":\"map\",\"values\": \"long\"}, \"name\":\"val1\" }," +
- "{ \"type\":{\"type\":\"array\",\"items\": \"string\"}, \"name\":\"val2\" }]}";
-
- @Test
- public void testAvroSchemeAndSerializer() {
- List<String> fields = Lists.newArrayList("ID", "val");
- List<Object> o = Lists.newArrayList(1, "2");
-
- AvroSerializer serializer = new AvroSerializer(schemaString, fields);
- ByteBuffer byteBuffer = serializer.write(o, null);
-
- AvroScheme scheme = new AvroScheme(schemaString, fields);
- assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
- }
-
- @Test
- public void testAvroComplexSchemeAndSerializer() {
- List<String> fields = Lists.newArrayList("ID", "val1", "val2");
-
- Map<String,Long> mp = Maps.newHashMap();;
- mp.put("l1",1234L);
- mp.put("l2",56789L);
- List<String> ls = Lists.newArrayList("s1", "s2");
- List<Object> o = Lists.newArrayList(1, mp, ls);
-
- AvroSerializer serializer = new AvroSerializer(schemaString1, fields);
- ByteBuffer byteBuffer = serializer.write(o, null);
-
- AvroScheme scheme = new AvroScheme(schemaString1, fields);
- assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java
deleted file mode 100644
index 0108949..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.serde.csv.CsvScheme;
-import org.apache.storm.sql.runtime.serde.csv.CsvSerializer;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class TestCsvSerializer {
-
- @Test
- public void testCsvSchemeAndSerializer() {
- List<String> fields = Lists.newArrayList("ID", "val");
- List<Object> o = Lists.newArrayList("1", "2");
-
- CsvSerializer serializer = new CsvSerializer(fields);
- ByteBuffer byteBuffer = serializer.write(o, null);
-
- CsvScheme scheme = new CsvScheme(fields);
- assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
-
- // Fields with embedded commas or double-quote characters
- fields = Lists.newArrayList("ID", "val", "v");
- o = Lists.newArrayList("1,9", "2,\"3\",5", "\"8\"");
-
- serializer = new CsvSerializer(fields);
- byteBuffer = serializer.write(o, null);
-
- scheme = new CsvScheme(fields);
- assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java
deleted file mode 100644
index 6ca1877..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.storm.sql.runtime.serde.json.JsonScheme;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.utils.Utils;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TestJsonRepresentation {
- @Test
- public void testJsonScheme() {
- final List<String> fields = Lists.newArrayList("ID", "val");
- final String s = "{\"ID\": 1, \"val\": \"2\"}";
- JsonScheme scheme = new JsonScheme(fields);
- List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
- assertArrayEquals(new Object[] {1, "2"}, o.toArray());
- }
-
- @Test
- public void testJsonSerializer() {
- final List<String> fields = Lists.newArrayList("ID", "val");
- List<Object> o = Lists.<Object> newArrayList(1, "2");
- JsonSerializer s = new JsonSerializer(fields);
- ByteBuffer buf = s.write(o, null);
- byte[] b = Utils.toByteArray(buf);
- assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java
deleted file mode 100644
index 1798828..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.serde.tsv.TsvScheme;
-import org.apache.storm.sql.runtime.serde.tsv.TsvSerializer;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class TestTsvSerializer {
-
- @Test
- public void testTsvSchemeAndSerializer() {
- final char delimiter = '\t';
-
- List<String> fields = Lists.newArrayList("ID", "val");
- List<Object> o = Lists.newArrayList("1", "2");
-
- TsvSerializer serializer = new TsvSerializer(fields, delimiter);
- ByteBuffer byteBuffer = serializer.write(o, null);
-
- TsvScheme scheme = new TsvScheme(fields, delimiter);
- assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
- }
-
-}