You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:05 UTC

[02/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
new file mode 100644
index 0000000..3b5eedd
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+public class Channels {
+  private static final ChannelContext VOID_CTX = new ChannelContext() {
+    @Override
+    public void emit(Values data) {}
+
+    @Override
+    public void fireChannelInactive() {}
+
+    @Override
+    public void flush() {
+
+    }
+
+    @Override
+    public void setSource(java.lang.Object source) {
+
+    }
+  };
+
+  private static class ChannelContextAdapter implements ChannelContext {
+    private final ChannelHandler handler;
+    private final ChannelContext next;
+
+    public ChannelContextAdapter(
+        ChannelContext next, ChannelHandler handler) {
+      this.handler = handler;
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      handler.dataReceived(next, data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      handler.channelInactive(next);
+    }
+
+    @Override
+    public void flush() {
+      handler.flush(next);
+    }
+
+    @Override
+    public void setSource(java.lang.Object source) {
+      handler.setSource(next, source);
+      next.setSource(source); // propagate through the chain
+    }
+  }
+
+  private static class ForwardingChannelContext implements ChannelContext {
+    private final ChannelContext next;
+
+    public ForwardingChannelContext(ChannelContext next) {
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      next.emit(data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      next.fireChannelInactive();
+    }
+
+    @Override
+    public void flush() {
+      next.flush();
+    }
+
+    @Override
+    public void setSource(Object source) {
+      next.setSource(source);
+    }
+  }
+
+  public static ChannelContext chain(
+      ChannelContext next, ChannelHandler handler) {
+    return new ChannelContextAdapter(next, handler);
+  }
+
+  public static ChannelContext voidContext() {
+    return VOID_CTX;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
new file mode 100644
index 0000000..352af73
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+  void open(ChannelContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
new file mode 100644
index 0000000..dbece9c
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+public interface DataSourcesProvider {
+  /**
+   * @return the scheme of the data source
+   */
+  String scheme();
+
+  /**
+   * Construct a new data source.
+   * @param uri The URI that specifies the data source. The format of the URI
+   *            is fully customizable.
+   * @param inputFormatClass the name of the class that deserializes data.
+   *                         It is null when unspecified.
+   * @param outputFormatClass the name of the class that serializes data. It
+   *                          is null when unspecified.
+   * @param fields The name of the fields and the schema of the table.
+   */
+  DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields);
+
+  ISqlTridentDataSource constructTrident(
+          URI uri, String inputFormatClass, String outputFormatClass,
+          Properties properties, List<FieldInfo> fields);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
new file mode 100644
index 0000000..dfefb01
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+public class DataSourcesRegistry {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DataSourcesRegistry.class);
+  private static final Map<String, DataSourcesProvider> providers;
+
+  static {
+    providers = new HashMap<>();
+    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+        DataSourcesProvider.class);
+    for (DataSourcesProvider p : loader) {
+      LOG.info("Registering scheme {} with {}", p.scheme(), p);
+      providers.put(p.scheme(), p);
+    }
+  }
+
+  private DataSourcesRegistry() {
+  }
+
+  public static DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
+  }
+
+  public static ISqlTridentDataSource constructTridentDataSource(
+          URI uri, String inputFormatClass, String outputFormatClass,
+          Properties properties, List<FieldInfo> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
+  }
+
+  /**
+   * Allow unit tests to inject data sources.
+   */
+  public static Map<String, DataSourcesProvider> providerMap() {
+    return providers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
new file mode 100644
index 0000000..03b030b
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.io.Serializable;
+
+/**
+ * Describe each column of the field
+ */
+public class FieldInfo implements Serializable {
+  private final String name;
+  private final Class<?> type;
+  private final boolean isPrimary;
+
+  public FieldInfo(String name, Class<?> type, boolean isPrimary) {
+    this.name = name;
+    this.type = type;
+    this.isPrimary = isPrimary;
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public Class<?> type() {
+    return type;
+  }
+
+  public boolean isPrimary() {
+    return isPrimary;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
new file mode 100644
index 0000000..b6670d9
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface IOutputSerializer {
+  /**
+   * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+   * memory.
+   *
+   * @return A ByteBuffer contains the serialized result.
+   */
+  ByteBuffer write(List<Object> data, ByteBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
new file mode 100644
index 0000000..9eae5ae
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+
+/**
+ * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
+ */
+public interface ISqlTridentDataSource {
+  /**
+   * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
+   *
+   * Please note that StateFactory and StateUpdater should use same class which implements State.
+   *
+   * @see org.apache.storm.trident.state.StateFactory
+   * @see org.apache.storm.trident.state.StateUpdater
+   */
+  interface SqlTridentConsumer {
+    StateFactory getStateFactory();
+    StateUpdater getStateUpdater();
+  }
+
+  /**
+   * Provides instance of ITridentDataSource which can be used as producer in Trident.
+   *
+   * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
+   * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
+   * - IBatchSpout
+   * - ITridentSpout
+   * - IPartitionedTridentSpout
+   * - IOpaquePartitionedTridentSpout
+   *
+   * @see org.apache.storm.trident.spout.ITridentDataSource
+   * @see org.apache.storm.trident.spout.IBatchSpout
+   * @see org.apache.storm.trident.spout.ITridentSpout
+   * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
+   * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
+   */
+  ITridentDataSource getProducer();
+
+  /**
+   * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
+   *
+   * @see SqlTridentConsumer
+   */
+  SqlTridentConsumer getConsumer();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
new file mode 100644
index 0000000..c9abd16
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+
+public class SimpleSqlTridentConsumer implements ISqlTridentDataSource.SqlTridentConsumer {
+    private final StateFactory stateFactory;
+    private final StateUpdater stateUpdater;
+
+    public SimpleSqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) {
+        this.stateFactory = stateFactory;
+        this.stateUpdater = stateUpdater;
+    }
+
+    @Override
+    public StateFactory getStateFactory() {
+        return stateFactory;
+    }
+
+    @Override
+    public StateUpdater getStateUpdater() {
+        return stateUpdater;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
new file mode 100644
index 0000000..a373483
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+public class StormSqlFunctions {
+  public static Boolean eq(Object b0, Object b1) {
+    if (b0 == null || b1 == null) {
+      return null;
+    }
+    return b0.equals(b1);
+  }
+
+  public static Boolean ne(Object b0, Object b1) {
+    if (b0 == null || b1 == null) {
+      return null;
+    }
+    return !b0.equals(b1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
new file mode 100644
index 0000000..e78f354
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.calcite;
+
+import org.apache.calcite.interpreter.Context;
+
+public class DebuggableExecutableExpression implements ExecutableExpression {
+    private ExecutableExpression delegate;
+    private String delegateCode;
+
+    public DebuggableExecutableExpression(ExecutableExpression delegate, String delegateCode) {
+        this.delegate = delegate;
+        this.delegateCode = delegateCode;
+    }
+
+    @Override
+    public Object execute(Context context) {
+        return delegate.execute(context);
+    }
+
+    @Override
+    public void execute(Context context, Object[] results) {
+        delegate.execute(context, results);
+    }
+
+    public String getDelegateCode() {
+        return delegateCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
new file mode 100644
index 0000000..8416945
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.calcite;
+
+import org.apache.calcite.interpreter.Context;
+
+import java.io.Serializable;
+
+/**
+ * Compiled executable expression.
+ */
+public interface ExecutableExpression extends Serializable {
+    Object execute(Context context);
+    void execute(Context context, Object[] results);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
new file mode 100644
index 0000000..4861b43
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.calcite;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Holder;
+
+import java.io.Serializable;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite.
+ */
+public class StormDataContext implements DataContext, Serializable {
+    private final ImmutableMap<Object, Object> map;
+
+    public StormDataContext() {
+        // Store the time at which the query started executing. The SQL
+        // standard says that functions such as CURRENT_TIMESTAMP return the
+        // same value throughout the query.
+
+        final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis());
+
+        // Give a hook chance to alter the clock.
+        Hook.CURRENT_TIME.run(timeHolder);
+        final long time = timeHolder.get();
+        final TimeZone timeZone = Calendar.getInstance().getTimeZone();
+        final long localOffset = timeZone.getOffset(time);
+        final long currentOffset = localOffset;
+
+        ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
+        builder.put(Variable.UTC_TIMESTAMP.camelName, time)
+                .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset)
+                .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
+                .put(Variable.TIME_ZONE.camelName, timeZone);
+        map = builder.build();
+    }
+
+    @Override
+    public SchemaPlus getRootSchema() {
+        return null;
+    }
+
+    @Override
+    public JavaTypeFactory getTypeFactory() {
+        return null;
+    }
+
+    @Override
+    public QueryProvider getQueryProvider() {
+        return null;
+    }
+
+    @Override
+    public Object get(String name) {
+        return map.get(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
new file mode 100644
index 0000000..0e65220
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState;
+import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater;
+import org.apache.storm.sql.runtime.datasource.socket.trident.TridentSocketSpout;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a Socket data source based on the URI and properties. The URI has the format of
+ * socket://[host]:[port]. Both of host and port are mandatory.
+ *
+ * Note that it connects to given host and port, and receive the message if it's used for input source,
+ * and send the message if it's used for output data source.
+ */
+public class SocketDataSourcesProvider implements DataSourcesProvider {
+    @Override
+    public String scheme() {
+        return "socket";
+    }
+
+    private static class SocketTridentDataSource implements ISqlTridentDataSource {
+
+        private final String host;
+        private final int port;
+        private final Scheme scheme;
+        private final IOutputSerializer serializer;
+
+        SocketTridentDataSource(Scheme scheme, IOutputSerializer serializer, String host, int port) {
+            this.scheme = scheme;
+            this.serializer = serializer;
+            this.host = host;
+            this.port = port;
+        }
+
+        @Override
+        public ITridentDataSource getProducer() {
+            return new TridentSocketSpout(scheme, host, port);
+        }
+
+        @Override
+        public SqlTridentConsumer getConsumer() {
+            StateFactory stateFactory = new SocketState.Factory(host, port);
+            StateUpdater<SocketState> stateUpdater = new SocketStateUpdater(serializer);
+            return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+        }
+    }
+
+    @Override
+    public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
+        String host = uri.getHost();
+        int port = uri.getPort();
+        if (port == -1) {
+            throw new RuntimeException("Port information is not available. URI: " + uri);
+        }
+
+        List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+        Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
+        IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+
+        return new SocketTridentDataSource(scheme, serializer, host, port);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
new file mode 100644
index 0000000..3f85756
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+/**
+ * Trident State implementation of Socket. It only supports writing.
+ */
+public class SocketState implements State {
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void beginCommit(Long txid) {
+        // no-op
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void commit(Long txid) {
+        // no-op
+    }
+
+    public static class Factory implements StateFactory {
+        private final String host;
+        private final int port;
+
+        public Factory(String host, int port) {
+            this.host = host;
+            this.port = port;
+        }
+
+        @Override
+        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+            BufferedWriter out;
+            try {
+                Socket socket = new Socket(host, port);
+                out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
+            } catch (IOException e) {
+                throw new RuntimeException("Exception while initializing socket for State. host " +
+                        host + " port " + port, e);
+            }
+
+            // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
+            return new SocketState(out);
+        }
+    }
+
+    private BufferedWriter out;
+
+    private SocketState(BufferedWriter out) {
+        this.out = out;
+    }
+
+    public void write(String str) throws IOException {
+        out.write(str);
+    }
+
+    public void flush() throws IOException {
+        out.flush();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
new file mode 100644
index 0000000..3062a90
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.trident;
+
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
+ */
+public class SocketStateUpdater extends BaseStateUpdater<SocketState> {
+    private static final Logger LOG = LoggerFactory.getLogger(SocketStateUpdater.class);
+
+    private final IOutputSerializer outputSerializer;
+
+    public SocketStateUpdater(IOutputSerializer outputSerializer) {
+        this.outputSerializer = outputSerializer;
+    }
+
+    @Override
+    public void updateState(SocketState state, List<TridentTuple> tuples, TridentCollector collector) {
+        try {
+            for (TridentTuple tuple : tuples) {
+                byte[] array = outputSerializer.write(tuple.getValues(), null).array();
+                String data = new String(array);
+                state.write(data + "\n");
+            }
+            state.flush();
+        } catch (IOException e) {
+            LOG.error("Error while updating state.", e);
+            collector.reportError(e);
+            throw new RuntimeException("Error while updating state.", e);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
new file mode 100644
index 0000000..97f63a7
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.trident;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.storm.Config;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
+ */
+public class TridentSocketSpout implements IBatchSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
+
+    private final String host;
+    private final int port;
+    private final Scheme scheme;
+
+    private volatile boolean _running = true;
+
+    private BlockingDeque<String> queue;
+    private Socket socket;
+    private Thread readerThread;
+    private BufferedReader in;
+    private ObjectMapper objectMapper;
+
+    private Map<Long, List<List<Object>>> batches;
+
+    public TridentSocketSpout(Scheme scheme, String host, int port) {
+        this.scheme = scheme;
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+        this.queue = new LinkedBlockingDeque<>();
+        this.objectMapper = new ObjectMapper();
+        this.batches = new HashMap<>();
+
+        try {
+            socket = new Socket(host, port);
+            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+        } catch (IOException e) {
+            throw new RuntimeException("Error opening socket: host " + host + " port " + port);
+        }
+
+        readerThread = new Thread(new SocketReaderRunnable());
+        readerThread.start();
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        // read line and parse it to json
+        List<List<Object>> batch = this.batches.get(batchId);
+        if (batch == null) {
+            batch = new ArrayList<>();
+
+            while(queue.peek() != null) {
+                String line = queue.poll();
+                List<Object> values = convertLineToTuple(line);
+                if (values == null) {
+                    continue;
+                }
+
+                batch.add(values);
+            }
+
+            this.batches.put(batchId, batch);
+        }
+
+        for (List<Object> list : batch) {
+            collector.emit(list);
+        }
+    }
+
+    private List<Object> convertLineToTuple(String line) {
+        return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
+    }
+
+    @Override
+    public void ack(long batchId) {
+        this.batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+        _running = false;
+        readerThread.interrupt();
+        queue.clear();
+
+        closeQuietly(in);
+        closeQuietly(socket);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return scheme.getOutputFields();
+    }
+
+    private class SocketReaderRunnable implements Runnable {
+        public void run() {
+            while (_running) {
+                try {
+                    String line = in.readLine();
+                    if (line == null) {
+                        throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
+                    }
+
+                    queue.push(line.trim());
+                } catch (Throwable t) {
+                    // This spout is added to test purpose, so just failing fast doesn't hurt much
+                    die(t);
+                }
+            }
+        }
+    }
+
+    private void die(Throwable t) {
+        LOG.error("Halting process: TridentSocketSpout died.", t);
+        if (_running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
+            System.exit(11);
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            if (closeable != null) {
+                closeable.close();
+            }
+        } catch (final IOException ioe) {
+            // ignore
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
new file mode 100644
index 0000000..3bf1a23
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AvroScheme uses generic(without code generation) instead of specific(with code generation) readers.
+ */
+public class AvroScheme implements Scheme {
+  private final String schemaString;
+  private final List<String> fieldNames;
+  private final CachedSchemas schemas;
+
+  public AvroScheme(String schemaString, List<String> fieldNames) {
+    this.schemaString = schemaString;
+    this.fieldNames = fieldNames;
+    this.schemas = new CachedSchemas();
+  }
+
+  @Override
+  public List<Object> deserialize(ByteBuffer ser) {
+    try {
+      Schema schema = schemas.getSchema(schemaString);
+      DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
+      GenericRecord record = reader.read(null, decoder);
+
+      ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+      for (String field : fieldNames) {
+        Object value = record.get(field);
+        // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
+        list.add(SerdeUtils.convertAvroUtf8(value));
+      }
+      return list;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fieldNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
new file mode 100644
index 0000000..5dc3393
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * AvroSerializer uses generic(without code generation) instead of specific(with code generation) writers.
+ */
+public class AvroSerializer implements IOutputSerializer, Serializable {
+  private final String schemaString;
+  private final List<String> fieldNames;
+  private final CachedSchemas schemas;
+
+  public AvroSerializer(String schemaString, List<String> fieldNames) {
+    this.schemaString = schemaString;
+    this.fieldNames = fieldNames;
+    this.schemas = new CachedSchemas();
+  }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
+    try {
+      Schema schema = schemas.getSchema(schemaString);
+      GenericRecord record = new GenericData.Record(schema);
+      for (int i = 0; i < fieldNames.size(); i++) {
+        record.put(fieldNames.get(i), data.get(i));
+      }
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
+      Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+      writer.write(record, encoder);
+      encoder.flush();
+      byte[] bytes = out.toByteArray();
+      out.close();
+      return ByteBuffer.wrap(bytes);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
new file mode 100644
index 0000000..4f0e747
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import org.apache.avro.Schema;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+// TODO this class is reserved for supporting messages with different schemas.
+// current only one schema in the cache
+public class CachedSchemas implements Serializable{
+
+    private final Map<String, Schema> cache = new HashMap<>();
+
+    public Schema getSchema(String schemaString) {
+        Schema schema = cache.get(schemaString);
+        if (schema == null) {
+            schema = new Schema.Parser().parse(schemaString);
+            cache.put(schemaString, schema);
+        }
+        return schema;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
new file mode 100644
index 0000000..34fb1bb
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.csv;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CsvScheme uses the standard RFC4180 CSV Parser
+ * One of the difference from Tsv format is that fields with embedded commas will be quoted.
+ * eg: a,"b,c",d is allowed.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
+ */
+public class CsvScheme implements Scheme {
+  private final List<String> fieldNames;
+
+  public CsvScheme(List<String> fieldNames) {
+    this.fieldNames = fieldNames;
+  }
+
+  @Override
+  public List<Object> deserialize(ByteBuffer ser) {
+    try {
+      String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+      CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
+      CSVRecord record = parser.getRecords().get(0);
+      Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
+
+      ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+      for (int i = 0; i < record.size(); i++) {
+        list.add(record.get(i));
+      }
+      return list;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fieldNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
new file mode 100644
index 0000000..0d3bd74
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * CsvSerializer uses the standard RFC4180 CSV Parser
+ * One of the difference from Tsv format is that fields with embedded commas will be quoted.
+ * eg: a,"b,c",d is allowed.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
+ */
+public class CsvSerializer implements IOutputSerializer, Serializable {
+  private final List<String> fields; //reserved for future
+
+  public CsvSerializer(List<String> fields) {
+        this.fields = fields;
+    }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    try {
+      StringWriter writer = new StringWriter();
+      CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
+      for (Object o : data) {
+        printer.print(o);
+      }
+      //since using StringWriter, we do not need to close it.
+      return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
new file mode 100644
index 0000000..d288fa1
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.json;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+  private final List<String> fields;
+
+  public JsonScheme(List<String> fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public List<Object> deserialize(ByteBuffer ser) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      @SuppressWarnings("unchecked")
+      HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+      ArrayList<Object> list = new ArrayList<>(fields.size());
+      for (String f : fields) {
+        list.add(map.get(f));
+      }
+      return list;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
new file mode 100644
index 0000000..1e825c4
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class JsonSerializer implements IOutputSerializer, Serializable {
+  private final List<String> fieldNames;
+  private final JsonFactory jsonFactory;
+
+  public JsonSerializer(List<String> fieldNames) {
+    this.fieldNames = fieldNames;
+    jsonFactory = new JsonFactory();
+  }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+    StringWriter sw = new StringWriter();
+    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+      jg.writeStartObject();
+      for (int i = 0; i < fieldNames.size(); ++i) {
+        jg.writeFieldName(fieldNames.get(i));
+        jg.writeObject(data.get(i));
+      }
+      jg.writeEndObject();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
new file mode 100644
index 0000000..310494c
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.tsv;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TsvScheme uses a simple delimited format implemention by splitting string,
+ * and it supports user defined delimiter.
+ */
+public class TsvScheme implements Scheme {
+  private final List<String> fieldNames;
+  private final char delimiter;
+
+  public TsvScheme(List<String> fieldNames, char delimiter) {
+    this.fieldNames = fieldNames;
+    this.delimiter = delimiter;
+  }
+
+  @Override
+  public List<Object> deserialize(ByteBuffer ser) {
+    String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+    List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
+    Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
+
+    ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+    list.addAll(parts);
+    return list;
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fieldNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
new file mode 100644
index 0000000..1cf1c76
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.tsv;
+
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * TsvSerializer uses a simple delimited format implemention by splitting string,
+ * and it supports user defined delimiter.
+ */
+public class TsvSerializer implements IOutputSerializer, Serializable {
+  private final List<String> fields; //reserved for future
+  private final char delimiter;
+
+  public TsvSerializer(List<String> fields, char delimiter) {
+    this.fields = fields;
+    this.delimiter = delimiter;
+    }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
+    for (int i = 0; i < data.size(); i++) {
+      Object o = data.get(i);
+      if (i == 0) {
+        sb.append(o);
+      } else {
+        sb.append(delimiter);
+        sb.append(o);
+      }
+    }
+    return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
new file mode 100644
index 0000000..6c76481
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EvaluationCalc implements OperationAwareFlatMapFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
+
+    private final ExecutableExpression filterInstance;
+    private final ExecutableExpression projectionInstance;
+    private final Object[] outputValues;
+    private final DataContext dataContext;
+
+    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+        this.filterInstance = filterInstance;
+        this.projectionInstance = projectionInstance;
+        this.outputValues = new Object[outputCount];
+        this.dataContext = dataContext;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+        }
+        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+        }
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+
+    @Override
+    public Iterable<Values> execute(TridentTuple input) {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.getValues().toArray();
+
+        if (filterInstance != null) {
+            filterInstance.execute(calciteContext, outputValues);
+            // filtered out
+            if (outputValues[0] == null || !((Boolean) outputValues[0])) {
+                return Collections.emptyList();
+            }
+        }
+
+        if (projectionInstance != null) {
+            projectionInstance.execute(calciteContext, outputValues);
+            return Collections.singletonList(new Values(outputValues));
+        } else {
+            return Collections.singletonList(new Values(input.getValues()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
new file mode 100644
index 0000000..9314852
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvaluationFilter extends BaseFilter {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
+
+    private final ExecutableExpression filterInstance;
+    private final DataContext dataContext;
+    private final Object[] outputValues;
+
+    public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
+        this.filterInstance = filterInstance;
+        this.dataContext = dataContext;
+        this.outputValues = new Object[1];
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+        }
+    }
+
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = tuple.getValues().toArray();
+        filterInstance.execute(calciteContext, outputValues);
+        return (outputValues[0] != null && (boolean) outputValues[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
new file mode 100644
index 0000000..2608104
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.trident.operation.OperationAwareMapFunction;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvaluationFunction implements OperationAwareMapFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
+
+    private final ExecutableExpression projectionInstance;
+    private final Object[] outputValues;
+    private final DataContext dataContext;
+
+    public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+        this.projectionInstance = projectionInstance;
+        this.outputValues = new Object[outputCount];
+        this.dataContext = dataContext;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        if (projectionInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+        }
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+
+    @Override
+    public Values execute(TridentTuple input) {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.getValues().toArray();
+        projectionInstance.execute(calciteContext, outputValues);
+        return new Values(outputValues);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
new file mode 100644
index 0000000..4c3a266
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ForwardFunction extends BaseFunction {
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        collector.emit(tuple.getValues());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
new file mode 100644
index 0000000..efd5d25
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.utils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.storm.sql.runtime.FieldInfo;
+
+import java.io.Serializable;
+import java.util.List;
+
+public final class FieldInfoUtils {
+
+    public static List<String> getFieldNames(List<FieldInfo> fields) {
+        return Lists.transform(fields, new FieldNameExtractor());
+    }
+
+    private static class FieldNameExtractor implements Function<FieldInfo, String>, Serializable {
+        @Override
+        public String apply(FieldInfo fieldInfo) {
+            return fieldInfo.name();
+        }
+    }
+}