You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:15 UTC

[12/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
deleted file mode 100644
index a373483..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-public class StormSqlFunctions {
-  public static Boolean eq(Object b0, Object b1) {
-    if (b0 == null || b1 == null) {
-      return null;
-    }
-    return b0.equals(b1);
-  }
-
-  public static Boolean ne(Object b0, Object b1) {
-    if (b0 == null || b1 == null) {
-      return null;
-    }
-    return !b0.equals(b1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
deleted file mode 100644
index e78f354..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.calcite;
-
-import org.apache.calcite.interpreter.Context;
-
-public class DebuggableExecutableExpression implements ExecutableExpression {
-    private ExecutableExpression delegate;
-    private String delegateCode;
-
-    public DebuggableExecutableExpression(ExecutableExpression delegate, String delegateCode) {
-        this.delegate = delegate;
-        this.delegateCode = delegateCode;
-    }
-
-    @Override
-    public Object execute(Context context) {
-        return delegate.execute(context);
-    }
-
-    @Override
-    public void execute(Context context, Object[] results) {
-        delegate.execute(context, results);
-    }
-
-    public String getDelegateCode() {
-        return delegateCode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
deleted file mode 100644
index 8416945..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.calcite;
-
-import org.apache.calcite.interpreter.Context;
-
-import java.io.Serializable;
-
-/**
- * Compiled executable expression.
- */
-public interface ExecutableExpression extends Serializable {
-    Object execute(Context context);
-    void execute(Context context, Object[] results);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
deleted file mode 100644
index 4861b43..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.calcite;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.runtime.Hook;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.util.Holder;
-
-import java.io.Serializable;
-import java.util.Calendar;
-import java.util.TimeZone;
-
-/**
- * This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite.
- */
-public class StormDataContext implements DataContext, Serializable {
-    private final ImmutableMap<Object, Object> map;
-
-    public StormDataContext() {
-        // Store the time at which the query started executing. The SQL
-        // standard says that functions such as CURRENT_TIMESTAMP return the
-        // same value throughout the query.
-
-        final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis());
-
-        // Give a hook chance to alter the clock.
-        Hook.CURRENT_TIME.run(timeHolder);
-        final long time = timeHolder.get();
-        final TimeZone timeZone = Calendar.getInstance().getTimeZone();
-        final long localOffset = timeZone.getOffset(time);
-        final long currentOffset = localOffset;
-
-        ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
-        builder.put(Variable.UTC_TIMESTAMP.camelName, time)
-                .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset)
-                .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
-                .put(Variable.TIME_ZONE.camelName, timeZone);
-        map = builder.build();
-    }
-
-    @Override
-    public SchemaPlus getRootSchema() {
-        return null;
-    }
-
-    @Override
-    public JavaTypeFactory getTypeFactory() {
-        return null;
-    }
-
-    @Override
-    public QueryProvider getQueryProvider() {
-        return null;
-    }
-
-    @Override
-    public Object get(String name) {
-        return map.get(name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
deleted file mode 100644
index d81e772..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket;
-
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater;
-import org.apache.storm.sql.runtime.datasource.socket.trident.TridentSocketSpout;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a Socket data source based on the URI and properties. The URI has the format of
- * socket://[host]:[port]. Both of host and port are mandatory.
- *
- * Note that it connects to given host and port, and receive the message if it's used for input source,
- * and send the message if it's used for output data source.
- */
-public class SocketDataSourcesProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-        return "socket";
-    }
-
-    private static class SocketTridentDataSource implements ISqlTridentDataSource {
-
-        private final String host;
-        private final int port;
-        private final Scheme scheme;
-        private final IOutputSerializer serializer;
-
-        SocketTridentDataSource(Scheme scheme, IOutputSerializer serializer, String host, int port) {
-            this.scheme = scheme;
-            this.serializer = serializer;
-            this.host = host;
-            this.port = port;
-        }
-
-        @Override
-        public ITridentDataSource getProducer() {
-            return new TridentSocketSpout(scheme, host, port);
-        }
-
-        @Override
-        public SqlTridentConsumer getConsumer() {
-            StateFactory stateFactory = new SocketState.Factory(host, port);
-            StateUpdater<SocketState> stateUpdater = new SocketStateUpdater(serializer);
-            return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
-        }
-    }
-
-    @Override
-    public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
-        String host = uri.getHost();
-        int port = uri.getPort();
-        if (port == -1) {
-            throw new RuntimeException("Port information is not available. URI: " + uri);
-        }
-
-        List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
-        Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
-        IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
-
-        return new SocketTridentDataSource(scheme, serializer, host, port);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
deleted file mode 100644
index 3f85756..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.Socket;
-import java.util.Map;
-
-/**
- * Trident State implementation of Socket. It only supports writing.
- */
-public class SocketState implements State {
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void beginCommit(Long txid) {
-        // no-op
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void commit(Long txid) {
-        // no-op
-    }
-
-    public static class Factory implements StateFactory {
-        private final String host;
-        private final int port;
-
-        public Factory(String host, int port) {
-            this.host = host;
-            this.port = port;
-        }
-
-        @Override
-        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-            BufferedWriter out;
-            try {
-                Socket socket = new Socket(host, port);
-                out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
-            } catch (IOException e) {
-                throw new RuntimeException("Exception while initializing socket for State. host " +
-                        host + " port " + port, e);
-            }
-
-            // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
-            return new SocketState(out);
-        }
-    }
-
-    private BufferedWriter out;
-
-    private SocketState(BufferedWriter out) {
-        this.out = out;
-    }
-
-    public void write(String str) throws IOException {
-        out.write(str);
-    }
-
-    public void flush() throws IOException {
-        out.flush();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
deleted file mode 100644
index 3062a90..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
- */
-public class SocketStateUpdater extends BaseStateUpdater<SocketState> {
-    private static final Logger LOG = LoggerFactory.getLogger(SocketStateUpdater.class);
-
-    private final IOutputSerializer outputSerializer;
-
-    public SocketStateUpdater(IOutputSerializer outputSerializer) {
-        this.outputSerializer = outputSerializer;
-    }
-
-    @Override
-    public void updateState(SocketState state, List<TridentTuple> tuples, TridentCollector collector) {
-        try {
-            for (TridentTuple tuple : tuples) {
-                byte[] array = outputSerializer.write(tuple.getValues(), null).array();
-                String data = new String(array);
-                state.write(data + "\n");
-            }
-            state.flush();
-        } catch (IOException e) {
-            LOG.error("Error while updating state.", e);
-            collector.reportError(e);
-            throw new RuntimeException("Error while updating state.", e);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
deleted file mode 100644
index 97f63a7..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.storm.Config;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-
-/**
- * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
- */
-public class TridentSocketSpout implements IBatchSpout {
-    private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
-
-    private final String host;
-    private final int port;
-    private final Scheme scheme;
-
-    private volatile boolean _running = true;
-
-    private BlockingDeque<String> queue;
-    private Socket socket;
-    private Thread readerThread;
-    private BufferedReader in;
-    private ObjectMapper objectMapper;
-
-    private Map<Long, List<List<Object>>> batches;
-
-    public TridentSocketSpout(Scheme scheme, String host, int port) {
-        this.scheme = scheme;
-        this.host = host;
-        this.port = port;
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context) {
-        this.queue = new LinkedBlockingDeque<>();
-        this.objectMapper = new ObjectMapper();
-        this.batches = new HashMap<>();
-
-        try {
-            socket = new Socket(host, port);
-            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-        } catch (IOException e) {
-            throw new RuntimeException("Error opening socket: host " + host + " port " + port);
-        }
-
-        readerThread = new Thread(new SocketReaderRunnable());
-        readerThread.start();
-    }
-
-    @Override
-    public void emitBatch(long batchId, TridentCollector collector) {
-        // read line and parse it to json
-        List<List<Object>> batch = this.batches.get(batchId);
-        if (batch == null) {
-            batch = new ArrayList<>();
-
-            while(queue.peek() != null) {
-                String line = queue.poll();
-                List<Object> values = convertLineToTuple(line);
-                if (values == null) {
-                    continue;
-                }
-
-                batch.add(values);
-            }
-
-            this.batches.put(batchId, batch);
-        }
-
-        for (List<Object> list : batch) {
-            collector.emit(list);
-        }
-    }
-
-    private List<Object> convertLineToTuple(String line) {
-        return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
-    }
-
-    @Override
-    public void ack(long batchId) {
-        this.batches.remove(batchId);
-    }
-
-    @Override
-    public void close() {
-        _running = false;
-        readerThread.interrupt();
-        queue.clear();
-
-        closeQuietly(in);
-        closeQuietly(socket);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return scheme.getOutputFields();
-    }
-
-    private class SocketReaderRunnable implements Runnable {
-        public void run() {
-            while (_running) {
-                try {
-                    String line = in.readLine();
-                    if (line == null) {
-                        throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
-                    }
-
-                    queue.push(line.trim());
-                } catch (Throwable t) {
-                    // This spout is added to test purpose, so just failing fast doesn't hurt much
-                    die(t);
-                }
-            }
-        }
-    }
-
-    private void die(Throwable t) {
-        LOG.error("Halting process: TridentSocketSpout died.", t);
-        if (_running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
-            System.exit(11);
-        }
-    }
-
-    private void closeQuietly(final Closeable closeable) {
-        try {
-            if (closeable != null) {
-                closeable.close();
-            }
-        } catch (final IOException ioe) {
-            // ignore
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
deleted file mode 100644
index 3bf1a23..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * AvroScheme uses generic(without code generation) instead of specific(with code generation) readers.
- */
-public class AvroScheme implements Scheme {
-  private final String schemaString;
-  private final List<String> fieldNames;
-  private final CachedSchemas schemas;
-
-  public AvroScheme(String schemaString, List<String> fieldNames) {
-    this.schemaString = schemaString;
-    this.fieldNames = fieldNames;
-    this.schemas = new CachedSchemas();
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    try {
-      Schema schema = schemas.getSchema(schemaString);
-      DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
-      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
-      GenericRecord record = reader.read(null, decoder);
-
-      ArrayList<Object> list = new ArrayList<>(fieldNames.size());
-      for (String field : fieldNames) {
-        Object value = record.get(field);
-        // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
-        list.add(SerdeUtils.convertAvroUtf8(value));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fieldNames);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
deleted file mode 100644
index 5dc3393..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.avro;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * AvroSerializer uses generic(without code generation) instead of specific(with code generation) writers.
- */
-public class AvroSerializer implements IOutputSerializer, Serializable {
-  private final String schemaString;
-  private final List<String> fieldNames;
-  private final CachedSchemas schemas;
-
-  public AvroSerializer(String schemaString, List<String> fieldNames) {
-    this.schemaString = schemaString;
-    this.fieldNames = fieldNames;
-    this.schemas = new CachedSchemas();
-  }
-
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
-    try {
-      Schema schema = schemas.getSchema(schemaString);
-      GenericRecord record = new GenericData.Record(schema);
-      for (int i = 0; i < fieldNames.size(); i++) {
-        record.put(fieldNames.get(i), data.get(i));
-      }
-
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
-      Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
-      writer.write(record, encoder);
-      encoder.flush();
-      byte[] bytes = out.toByteArray();
-      out.close();
-      return ByteBuffer.wrap(bytes);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
deleted file mode 100644
index 4f0e747..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.avro;
-
-import org.apache.avro.Schema;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-// TODO this class is reserved for supporting messages with different schemas.
-// current only one schema in the cache
-public class CachedSchemas implements Serializable{
-
-    private final Map<String, Schema> cache = new HashMap<>();
-
-    public Schema getSchema(String schemaString) {
-        Schema schema = cache.get(schemaString);
-        if (schema == null) {
-            schema = new Schema.Parser().parse(schemaString);
-            cache.put(schemaString, schema);
-        }
-        return schema;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
deleted file mode 100644
index 34fb1bb..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.csv;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * CsvScheme uses the standard RFC4180 CSV Parser
- * One of the difference from Tsv format is that fields with embedded commas will be quoted.
- * eg: a,"b,c",d is allowed.
- *
- * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
- */
-public class CsvScheme implements Scheme {
-  private final List<String> fieldNames;
-
-  public CsvScheme(List<String> fieldNames) {
-    this.fieldNames = fieldNames;
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    try {
-      String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
-      CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
-      CSVRecord record = parser.getRecords().get(0);
-      Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
-
-      ArrayList<Object> list = new ArrayList<>(fieldNames.size());
-      for (int i = 0; i < record.size(); i++) {
-        list.add(record.get(i));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fieldNames);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
deleted file mode 100644
index 0d3bd74..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.csv;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-/**
- * CsvSerializer uses the standard RFC4180 CSV Parser
- * One of the difference from Tsv format is that fields with embedded commas will be quoted.
- * eg: a,"b,c",d is allowed.
- *
- * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
- */
-public class CsvSerializer implements IOutputSerializer, Serializable {
-  private final List<String> fields; //reserved for future
-
-  public CsvSerializer(List<String> fields) {
-        this.fields = fields;
-    }
-
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    try {
-      StringWriter writer = new StringWriter();
-      CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
-      for (Object o : data) {
-        printer.print(o);
-      }
-      //since using StringWriter, we do not need to close it.
-      return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
deleted file mode 100644
index d288fa1..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.json;
-
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class JsonScheme implements Scheme {
-  private final List<String> fields;
-
-  public JsonScheme(List<String> fields) {
-    this.fields = fields;
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      @SuppressWarnings("unchecked")
-      HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
-      ArrayList<Object> list = new ArrayList<>(fields.size());
-      for (String f : fields) {
-        list.add(map.get(f));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fields);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
deleted file mode 100644
index 1e825c4..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.json;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-public class JsonSerializer implements IOutputSerializer, Serializable {
-  private final List<String> fieldNames;
-  private final JsonFactory jsonFactory;
-
-  public JsonSerializer(List<String> fieldNames) {
-    this.fieldNames = fieldNames;
-    jsonFactory = new JsonFactory();
-  }
-
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
-    StringWriter sw = new StringWriter();
-    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
-      jg.writeStartObject();
-      for (int i = 0; i < fieldNames.size(); ++i) {
-        jg.writeFieldName(fieldNames.get(i));
-        jg.writeObject(data.get(i));
-      }
-      jg.writeEndObject();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
deleted file mode 100644
index 310494c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.tsv;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TsvScheme uses a simple delimited format implemention by splitting string,
- * and it supports user defined delimiter.
- */
-public class TsvScheme implements Scheme {
-  private final List<String> fieldNames;
-  private final char delimiter;
-
-  public TsvScheme(List<String> fieldNames, char delimiter) {
-    this.fieldNames = fieldNames;
-    this.delimiter = delimiter;
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
-    List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
-    Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
-
-    ArrayList<Object> list = new ArrayList<>(fieldNames.size());
-    list.addAll(parts);
-    return list;
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fieldNames);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
deleted file mode 100644
index 1cf1c76..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.serde.tsv;
-
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-/**
- * TsvSerializer uses a simple delimited format implemention by splitting string,
- * and it supports user defined delimiter.
- */
-public class TsvSerializer implements IOutputSerializer, Serializable {
-  private final List<String> fields; //reserved for future
-  private final char delimiter;
-
-  public TsvSerializer(List<String> fields, char delimiter) {
-    this.fields = fields;
-    this.delimiter = delimiter;
-    }
-
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
-    for (int i = 0; i < data.size(); i++) {
-      Object o = data.get(i);
-      if (i == 0) {
-        sb.append(o);
-      } else {
-        sb.append(delimiter);
-        sb.append(o);
-      }
-    }
-    return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
deleted file mode 100644
index 6c76481..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Map;
-
-public class EvaluationCalc implements OperationAwareFlatMapFunction {
-    private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
-
-    private final ExecutableExpression filterInstance;
-    private final ExecutableExpression projectionInstance;
-    private final Object[] outputValues;
-    private final DataContext dataContext;
-
-    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
-        this.filterInstance = filterInstance;
-        this.projectionInstance = projectionInstance;
-        this.outputValues = new Object[outputCount];
-        this.dataContext = dataContext;
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
-        }
-        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
-        }
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-
-    @Override
-    public Iterable<Values> execute(TridentTuple input) {
-        Context calciteContext = new StormContext(dataContext);
-        calciteContext.values = input.getValues().toArray();
-
-        if (filterInstance != null) {
-            filterInstance.execute(calciteContext, outputValues);
-            // filtered out
-            if (outputValues[0] == null || !((Boolean) outputValues[0])) {
-                return Collections.emptyList();
-            }
-        }
-
-        if (projectionInstance != null) {
-            projectionInstance.execute(calciteContext, outputValues);
-            return Collections.singletonList(new Values(outputValues));
-        } else {
-            return Collections.singletonList(new Values(input.getValues()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
deleted file mode 100644
index 9314852..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class EvaluationFilter extends BaseFilter {
-    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
-
-    private final ExecutableExpression filterInstance;
-    private final DataContext dataContext;
-    private final Object[] outputValues;
-
-    public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
-        this.filterInstance = filterInstance;
-        this.dataContext = dataContext;
-        this.outputValues = new Object[1];
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
-        }
-    }
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        Context calciteContext = new StormContext(dataContext);
-        calciteContext.values = tuple.getValues().toArray();
-        filterInstance.execute(calciteContext, outputValues);
-        return (outputValues[0] != null && (boolean) outputValues[0]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
deleted file mode 100644
index 2608104..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class EvaluationFunction implements OperationAwareMapFunction {
-    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
-
-    private final ExecutableExpression projectionInstance;
-    private final Object[] outputValues;
-    private final DataContext dataContext;
-
-    public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
-        this.projectionInstance = projectionInstance;
-        this.outputValues = new Object[outputCount];
-        this.dataContext = dataContext;
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        if (projectionInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
-        }
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-
-    @Override
-    public Values execute(TridentTuple input) {
-        Context calciteContext = new StormContext(dataContext);
-        calciteContext.values = input.getValues().toArray();
-        projectionInstance.execute(calciteContext, outputValues);
-        return new Values(outputValues);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
deleted file mode 100644
index 4c3a266..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class ForwardFunction extends BaseFunction {
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        collector.emit(tuple.getValues());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
deleted file mode 100644
index efd5d25..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.utils;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.FieldInfo;
-
-import java.io.Serializable;
-import java.util.List;
-
-public final class FieldInfoUtils {
-
-    public static List<String> getFieldNames(List<FieldInfo> fields) {
-        return Lists.transform(fields, new FieldNameExtractor());
-    }
-
-    private static class FieldNameExtractor implements Function<FieldInfo, String>, Serializable {
-        @Override
-        public String apply(FieldInfo fieldInfo) {
-            return fieldInfo.name();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
deleted file mode 100644
index 6b3dfc9..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.utils;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.util.Utf8;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.serde.avro.AvroScheme;
-import org.apache.storm.sql.runtime.serde.avro.AvroSerializer;
-import org.apache.storm.sql.runtime.serde.csv.CsvScheme;
-import org.apache.storm.sql.runtime.serde.csv.CsvSerializer;
-import org.apache.storm.sql.runtime.serde.json.JsonScheme;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.sql.runtime.serde.tsv.TsvScheme;
-import org.apache.storm.sql.runtime.serde.tsv.TsvSerializer;
-import org.apache.storm.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public final class SerdeUtils {
-    public static Scheme getScheme(String inputFormatClass, Properties properties, List<String> fieldNames) {
-        Scheme scheme;
-        if (isNotEmpty(inputFormatClass)) {
-            if (JsonScheme.class.getName().equals(inputFormatClass)) {
-                scheme = new JsonScheme(fieldNames);
-            } else if (TsvScheme.class.getName().equals(inputFormatClass)) {
-                String delimiter = properties.getProperty("input.tsv.delimiter", "\t");
-                scheme = new TsvScheme(fieldNames, delimiter.charAt(0));
-            } else if (CsvScheme.class.getName().equals(inputFormatClass)) {
-                scheme = new CsvScheme(fieldNames);
-            } else if (AvroScheme.class.getName().equals(inputFormatClass)) {
-                String schemaString = properties.getProperty("input.avro.schema");
-                Preconditions.checkArgument(isNotEmpty(schemaString), "input.avro.schema can not be empty");
-                scheme = new AvroScheme(schemaString, fieldNames);
-            } else {
-                scheme = Utils.newInstance(inputFormatClass);
-            }
-        } else {
-            //use JsonScheme as the default scheme
-            scheme = new JsonScheme(fieldNames);
-        }
-        return scheme;
-    }
-
-    public static IOutputSerializer getSerializer(String outputFormatClass, Properties properties, List<String> fieldNames) {
-        IOutputSerializer serializer;
-        if (isNotEmpty(outputFormatClass)) {
-            if (JsonSerializer.class.getName().equals(outputFormatClass)) {
-                serializer = new JsonSerializer(fieldNames);
-            } else if (TsvSerializer.class.getName().equals(outputFormatClass)) {
-                String delimiter = properties.getProperty("output.tsv.delimiter", "\t");
-                serializer = new TsvSerializer(fieldNames, delimiter.charAt(0));
-            } else if (CsvSerializer.class.getName().equals(outputFormatClass)) {
-                serializer = new CsvSerializer(fieldNames);
-            } else if (AvroSerializer.class.getName().equals(outputFormatClass)) {
-                String schemaString = properties.getProperty("output.avro.schema");
-                Preconditions.checkArgument(isNotEmpty(schemaString), "output.avro.schema can not be empty");
-                serializer = new AvroSerializer(schemaString, fieldNames);
-            } else {
-                serializer = Utils.newInstance(outputFormatClass);
-            }
-        } else {
-            //use JsonSerializer as the default serializer
-            serializer = new JsonSerializer(fieldNames);
-        }
-        return serializer;
-    }
-
-    public static Object convertAvroUtf8(Object value){
-        Object ret;
-        if (value instanceof Utf8) {
-            ret = value.toString();
-        } else if (value instanceof Map<?, ?>) {
-            ret = convertAvroUtf8Map((Map<Object,Object>)value);
-        } else if (value instanceof GenericData.Array) {
-            ret = convertAvroUtf8Array((GenericData.Array)value);
-        } else {
-            ret = value;
-        }
-        return ret;
-    }
-
-    public static Object convertAvroUtf8Map(Map<Object,Object> value) {
-        Map<Object, Object> map = new HashMap<>(value.size());
-        for (Map.Entry<Object, Object> entry : value.entrySet()) {
-            Object k = convertAvroUtf8(entry.getKey());
-            Object v = convertAvroUtf8(entry.getValue());
-            map.put(k, v);
-        }
-        return map;
-    }
-
-    public static Object convertAvroUtf8Array(GenericData.Array value){
-        List<Object> ls = new ArrayList<>(value.size());
-        for(Object o : value){
-            ls.add(convertAvroUtf8(o));
-        }
-        return ls;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
deleted file mode 100644
index a0f3af3..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime.utils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public final class Utils {
-
-    /**
-     * This method for splitting string into parts by a delimiter
-     * It has higher performance than String.split(String regex)
-     *
-     * @param data
-     * @param delimiter
-     * @return
-     */
-    public static List<String> split(String data, char delimiter){
-        List<String> list = new LinkedList<>();
-        //do not use .toCharArray avoid system copy
-        StringBuilder sb = new StringBuilder(512);
-        int len = data.length();
-        for (int i=0; i < len; i++) {
-            char ch = data.charAt(i);
-            if(ch == delimiter){
-                list.add(sb.toString());
-                sb.setLength(0);
-                if(i == len - 1){
-                    list.add("");
-                }
-            }else{
-                sb.append(ch);
-            }
-        }
-        if (sb.length() > 0) {
-            list.add(sb.toString());
-        }
-        return list;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 9a945f7..0000000
--- a/external/sql/storm-sql-runtime/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.runtime.datasource.socket.SocketDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java
deleted file mode 100644
index 174bfde..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestAvroSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.storm.sql.runtime.serde.avro.AvroScheme;
-import org.apache.storm.sql.runtime.serde.avro.AvroSerializer;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class TestAvroSerializer {
-  private static final String schemaString = "{\"type\":\"record\"," +
-          "\"name\":\"avrotest\"," +
-          "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"}," +
-          "{ \"name\":\"val\", \"type\":\"string\" }]}";
-
-  private static final String schemaString1 = "{\"type\":\"record\"," +
-          "\"name\":\"avrotest\"," +
-          "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"}," +
-          "{ \"type\":{\"type\":\"map\",\"values\": \"long\"}, \"name\":\"val1\" }," +
-          "{ \"type\":{\"type\":\"array\",\"items\": \"string\"}, \"name\":\"val2\" }]}";
-
-  @Test
-  public void testAvroSchemeAndSerializer() {
-    List<String> fields = Lists.newArrayList("ID", "val");
-    List<Object> o = Lists.newArrayList(1, "2");
-
-    AvroSerializer serializer = new AvroSerializer(schemaString, fields);
-    ByteBuffer byteBuffer = serializer.write(o, null);
-
-    AvroScheme scheme = new AvroScheme(schemaString, fields);
-    assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
-  }
-
-  @Test
-  public void testAvroComplexSchemeAndSerializer() {
-    List<String> fields = Lists.newArrayList("ID", "val1", "val2");
-
-    Map<String,Long> mp = Maps.newHashMap();;
-    mp.put("l1",1234L);
-    mp.put("l2",56789L);
-    List<String> ls = Lists.newArrayList("s1", "s2");
-    List<Object> o = Lists.newArrayList(1, mp, ls);
-
-    AvroSerializer serializer = new AvroSerializer(schemaString1, fields);
-    ByteBuffer byteBuffer = serializer.write(o, null);
-
-    AvroScheme scheme = new AvroScheme(schemaString1, fields);
-    assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java
deleted file mode 100644
index 0108949..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestCsvSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.serde.csv.CsvScheme;
-import org.apache.storm.sql.runtime.serde.csv.CsvSerializer;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class TestCsvSerializer {
-
-  @Test
-  public void testCsvSchemeAndSerializer() {
-    List<String> fields = Lists.newArrayList("ID", "val");
-    List<Object> o = Lists.newArrayList("1", "2");
-
-    CsvSerializer serializer = new CsvSerializer(fields);
-    ByteBuffer byteBuffer = serializer.write(o, null);
-
-    CsvScheme scheme = new CsvScheme(fields);
-    assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
-
-    // Fields with embedded commas or double-quote characters
-    fields = Lists.newArrayList("ID", "val", "v");
-    o = Lists.newArrayList("1,9", "2,\"3\",5", "\"8\"");
-
-    serializer = new CsvSerializer(fields);
-    byteBuffer = serializer.write(o, null);
-
-    scheme = new CsvScheme(fields);
-    assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java
deleted file mode 100644
index 6ca1877..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.storm.sql.runtime.serde.json.JsonScheme;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.utils.Utils;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TestJsonRepresentation {
-  @Test
-  public void testJsonScheme() {
-    final List<String> fields = Lists.newArrayList("ID", "val");
-    final String s = "{\"ID\": 1, \"val\": \"2\"}";
-    JsonScheme scheme = new JsonScheme(fields);
-    List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
-    assertArrayEquals(new Object[] {1, "2"}, o.toArray());
-  }
-
-  @Test
-  public void testJsonSerializer() {
-    final List<String> fields = Lists.newArrayList("ID", "val");
-    List<Object> o = Lists.<Object> newArrayList(1, "2");
-    JsonSerializer s = new JsonSerializer(fields);
-    ByteBuffer buf = s.write(o, null);
-    byte[] b = Utils.toByteArray(buf);
-    assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java
deleted file mode 100644
index 1798828..0000000
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestTsvSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.serde.tsv.TsvScheme;
-import org.apache.storm.sql.runtime.serde.tsv.TsvSerializer;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-
-public class TestTsvSerializer {
-
-  @Test
-  public void testTsvSchemeAndSerializer() {
-    final char delimiter = '\t';
-
-    List<String> fields = Lists.newArrayList("ID", "val");
-    List<Object> o = Lists.newArrayList("1", "2");
-
-    TsvSerializer serializer = new TsvSerializer(fields, delimiter);
-    ByteBuffer byteBuffer = serializer.write(o, null);
-
-    TsvScheme scheme = new TsvScheme(fields, delimiter);
-    assertArrayEquals(o.toArray(), scheme.deserialize(byteBuffer).toArray());
-  }
-
-}