You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:05 UTC
[02/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
new file mode 100644
index 0000000..3b5eedd
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+public class Channels {
+ private static final ChannelContext VOID_CTX = new ChannelContext() {
+ @Override
+ public void emit(Values data) {}
+
+ @Override
+ public void fireChannelInactive() {}
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void setSource(java.lang.Object source) {
+
+ }
+ };
+
+ private static class ChannelContextAdapter implements ChannelContext {
+ private final ChannelHandler handler;
+ private final ChannelContext next;
+
+ public ChannelContextAdapter(
+ ChannelContext next, ChannelHandler handler) {
+ this.handler = handler;
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ handler.dataReceived(next, data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ handler.channelInactive(next);
+ }
+
+ @Override
+ public void flush() {
+ handler.flush(next);
+ }
+
+ @Override
+ public void setSource(java.lang.Object source) {
+ handler.setSource(next, source);
+ next.setSource(source); // propagate through the chain
+ }
+ }
+
+ private static class ForwardingChannelContext implements ChannelContext {
+ private final ChannelContext next;
+
+ public ForwardingChannelContext(ChannelContext next) {
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ next.emit(data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ next.fireChannelInactive();
+ }
+
+ @Override
+ public void flush() {
+ next.flush();
+ }
+
+ @Override
+ public void setSource(Object source) {
+ next.setSource(source);
+ }
+ }
+
+ public static ChannelContext chain(
+ ChannelContext next, ChannelHandler handler) {
+ return new ChannelContextAdapter(next, handler);
+ }
+
+ public static ChannelContext voidContext() {
+ return VOID_CTX;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
new file mode 100644
index 0000000..352af73
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+ void open(ChannelContext ctx);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
new file mode 100644
index 0000000..dbece9c
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+public interface DataSourcesProvider {
+ /**
+ * @return the scheme of the data source
+ */
+ String scheme();
+
+ /**
+ * Construct a new data source.
+ * @param uri The URI that specifies the data source. The format of the URI
+ * is fully customizable.
+ * @param inputFormatClass the name of the class that deserializes data.
+ * It is null when unspecified.
+ * @param outputFormatClass the name of the class that serializes data. It
+ * is null when unspecified.
+ * @param fields The name of the fields and the schema of the table.
+ */
+ DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields);
+
+ ISqlTridentDataSource constructTrident(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
new file mode 100644
index 0000000..dfefb01
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+public class DataSourcesRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DataSourcesRegistry.class);
+ private static final Map<String, DataSourcesProvider> providers;
+
+ static {
+ providers = new HashMap<>();
+ ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+ DataSourcesProvider.class);
+ for (DataSourcesProvider p : loader) {
+ LOG.info("Registering scheme {} with {}", p.scheme(), p);
+ providers.put(p.scheme(), p);
+ }
+ }
+
+ private DataSourcesRegistry() {
+ }
+
+ public static DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ DataSourcesProvider provider = providers.get(uri.getScheme());
+ if (provider == null) {
+ return null;
+ }
+
+ return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
+ }
+
+ public static ISqlTridentDataSource constructTridentDataSource(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ DataSourcesProvider provider = providers.get(uri.getScheme());
+ if (provider == null) {
+ return null;
+ }
+
+ return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
+ }
+
+ /**
+ * Allow unit tests to inject data sources.
+ */
+ public static Map<String, DataSourcesProvider> providerMap() {
+ return providers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
new file mode 100644
index 0000000..03b030b
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.io.Serializable;
+
+/**
+ * Describe each column of the field
+ */
+public class FieldInfo implements Serializable {
+ private final String name;
+ private final Class<?> type;
+ private final boolean isPrimary;
+
+ public FieldInfo(String name, Class<?> type, boolean isPrimary) {
+ this.name = name;
+ this.type = type;
+ this.isPrimary = isPrimary;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public Class<?> type() {
+ return type;
+ }
+
+ public boolean isPrimary() {
+ return isPrimary;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
new file mode 100644
index 0000000..b6670d9
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface IOutputSerializer {
+ /**
+ * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+ * memory.
+ *
+ * @return A ByteBuffer contains the serialized result.
+ */
+ ByteBuffer write(List<Object> data, ByteBuffer buffer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
new file mode 100644
index 0000000..9eae5ae
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+
+/**
+ * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
+ */
+public interface ISqlTridentDataSource {
+ /**
+ * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
+ *
+ * Please note that StateFactory and StateUpdater should use same class which implements State.
+ *
+ * @see org.apache.storm.trident.state.StateFactory
+ * @see org.apache.storm.trident.state.StateUpdater
+ */
+ interface SqlTridentConsumer {
+ StateFactory getStateFactory();
+ StateUpdater getStateUpdater();
+ }
+
+ /**
+ * Provides instance of ITridentDataSource which can be used as producer in Trident.
+ *
+ * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
+ * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
+ * - IBatchSpout
+ * - ITridentSpout
+ * - IPartitionedTridentSpout
+ * - IOpaquePartitionedTridentSpout
+ *
+ * @see org.apache.storm.trident.spout.ITridentDataSource
+ * @see org.apache.storm.trident.spout.IBatchSpout
+ * @see org.apache.storm.trident.spout.ITridentSpout
+ * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
+ * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
+ */
+ ITridentDataSource getProducer();
+
+ /**
+ * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
+ *
+ * @see SqlTridentConsumer
+ */
+ SqlTridentConsumer getConsumer();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
new file mode 100644
index 0000000..c9abd16
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+
+public class SimpleSqlTridentConsumer implements ISqlTridentDataSource.SqlTridentConsumer {
+ private final StateFactory stateFactory;
+ private final StateUpdater stateUpdater;
+
+ public SimpleSqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) {
+ this.stateFactory = stateFactory;
+ this.stateUpdater = stateUpdater;
+ }
+
+ @Override
+ public StateFactory getStateFactory() {
+ return stateFactory;
+ }
+
+ @Override
+ public StateUpdater getStateUpdater() {
+ return stateUpdater;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
new file mode 100644
index 0000000..a373483
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+public class StormSqlFunctions {
+ public static Boolean eq(Object b0, Object b1) {
+ if (b0 == null || b1 == null) {
+ return null;
+ }
+ return b0.equals(b1);
+ }
+
+ public static Boolean ne(Object b0, Object b1) {
+ if (b0 == null || b1 == null) {
+ return null;
+ }
+ return !b0.equals(b1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
new file mode 100644
index 0000000..e78f354
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/DebuggableExecutableExpression.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.calcite;
+
+import org.apache.calcite.interpreter.Context;
+
+public class DebuggableExecutableExpression implements ExecutableExpression {
+ private ExecutableExpression delegate;
+ private String delegateCode;
+
+ public DebuggableExecutableExpression(ExecutableExpression delegate, String delegateCode) {
+ this.delegate = delegate;
+ this.delegateCode = delegateCode;
+ }
+
+ @Override
+ public Object execute(Context context) {
+ return delegate.execute(context);
+ }
+
+ @Override
+ public void execute(Context context, Object[] results) {
+ delegate.execute(context, results);
+ }
+
+ public String getDelegateCode() {
+ return delegateCode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
new file mode 100644
index 0000000..8416945
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.calcite;
+
+import org.apache.calcite.interpreter.Context;
+
+import java.io.Serializable;
+
+/**
+ * Compiled executable expression.
+ */
+public interface ExecutableExpression extends Serializable {
+ Object execute(Context context);
+ void execute(Context context, Object[] results);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
new file mode 100644
index 0000000..4861b43
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.calcite;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Holder;
+
+import java.io.Serializable;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite.
+ */
+public class StormDataContext implements DataContext, Serializable {
+ private final ImmutableMap<Object, Object> map;
+
+ public StormDataContext() {
+ // Store the time at which the query started executing. The SQL
+ // standard says that functions such as CURRENT_TIMESTAMP return the
+ // same value throughout the query.
+
+ final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis());
+
+ // Give a hook chance to alter the clock.
+ Hook.CURRENT_TIME.run(timeHolder);
+ final long time = timeHolder.get();
+ final TimeZone timeZone = Calendar.getInstance().getTimeZone();
+ final long localOffset = timeZone.getOffset(time);
+ final long currentOffset = localOffset;
+
+ ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
+ builder.put(Variable.UTC_TIMESTAMP.camelName, time)
+ .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset)
+ .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
+ .put(Variable.TIME_ZONE.camelName, timeZone);
+ map = builder.build();
+ }
+
+ @Override
+ public SchemaPlus getRootSchema() {
+ return null;
+ }
+
+ @Override
+ public JavaTypeFactory getTypeFactory() {
+ return null;
+ }
+
+ @Override
+ public QueryProvider getQueryProvider() {
+ return null;
+ }
+
+ @Override
+ public Object get(String name) {
+ return map.get(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
new file mode 100644
index 0000000..0e65220
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState;
+import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater;
+import org.apache.storm.sql.runtime.datasource.socket.trident.TridentSocketSpout;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a Socket data source based on the URI and properties. The URI has the format of
+ * socket://[host]:[port]. Both of host and port are mandatory.
+ *
+ * Note that it connects to given host and port, and receive the message if it's used for input source,
+ * and send the message if it's used for output data source.
+ */
+public class SocketDataSourcesProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "socket";
+ }
+
+ private static class SocketTridentDataSource implements ISqlTridentDataSource {
+
+ private final String host;
+ private final int port;
+ private final Scheme scheme;
+ private final IOutputSerializer serializer;
+
+ SocketTridentDataSource(Scheme scheme, IOutputSerializer serializer, String host, int port) {
+ this.scheme = scheme;
+ this.serializer = serializer;
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ return new TridentSocketSpout(scheme, host, port);
+ }
+
+ @Override
+ public SqlTridentConsumer getConsumer() {
+ StateFactory stateFactory = new SocketState.Factory(host, port);
+ StateUpdater<SocketState> stateUpdater = new SocketStateUpdater(serializer);
+ return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ }
+ }
+
+ @Override
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
+ String host = uri.getHost();
+ int port = uri.getPort();
+ if (port == -1) {
+ throw new RuntimeException("Port information is not available. URI: " + uri);
+ }
+
+ List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+ Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
+ IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+
+ return new SocketTridentDataSource(scheme, serializer, host, port);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
new file mode 100644
index 0000000..3f85756
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+/**
+ * Trident State implementation of Socket. It only supports writing.
+ */
+public class SocketState implements State {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void beginCommit(Long txid) {
+ // no-op
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void commit(Long txid) {
+ // no-op
+ }
+
+ public static class Factory implements StateFactory {
+ private final String host;
+ private final int port;
+
+ public Factory(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ BufferedWriter out;
+ try {
+ Socket socket = new Socket(host, port);
+ out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
+ } catch (IOException e) {
+ throw new RuntimeException("Exception while initializing socket for State. host " +
+ host + " port " + port, e);
+ }
+
+ // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
+ return new SocketState(out);
+ }
+ }
+
+ private BufferedWriter out;
+
+ private SocketState(BufferedWriter out) {
+ this.out = out;
+ }
+
+ public void write(String str) throws IOException {
+ out.write(str);
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
new file mode 100644
index 0000000..3062a90
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.trident;
+
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
+ */
+public class SocketStateUpdater extends BaseStateUpdater<SocketState> {
+ private static final Logger LOG = LoggerFactory.getLogger(SocketStateUpdater.class);
+
+ private final IOutputSerializer outputSerializer;
+
+ public SocketStateUpdater(IOutputSerializer outputSerializer) {
+ this.outputSerializer = outputSerializer;
+ }
+
+ @Override
+ public void updateState(SocketState state, List<TridentTuple> tuples, TridentCollector collector) {
+ try {
+ for (TridentTuple tuple : tuples) {
+ byte[] array = outputSerializer.write(tuple.getValues(), null).array();
+ String data = new String(array);
+ state.write(data + "\n");
+ }
+ state.flush();
+ } catch (IOException e) {
+ LOG.error("Error while updating state.", e);
+ collector.reportError(e);
+ throw new RuntimeException("Error while updating state.", e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
new file mode 100644
index 0000000..97f63a7
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.trident;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.storm.Config;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
+ */
+public class TridentSocketSpout implements IBatchSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
+
+ private final String host;
+ private final int port;
+ private final Scheme scheme;
+
+ private volatile boolean _running = true;
+
+ private BlockingDeque<String> queue;
+ private Socket socket;
+ private Thread readerThread;
+ private BufferedReader in;
+ private ObjectMapper objectMapper;
+
+ private Map<Long, List<List<Object>>> batches;
+
+ public TridentSocketSpout(Scheme scheme, String host, int port) {
+ this.scheme = scheme;
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ this.queue = new LinkedBlockingDeque<>();
+ this.objectMapper = new ObjectMapper();
+ this.batches = new HashMap<>();
+
+ try {
+ socket = new Socket(host, port);
+ in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ } catch (IOException e) {
+ throw new RuntimeException("Error opening socket: host " + host + " port " + port);
+ }
+
+ readerThread = new Thread(new SocketReaderRunnable());
+ readerThread.start();
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ // read line and parse it to json
+ List<List<Object>> batch = this.batches.get(batchId);
+ if (batch == null) {
+ batch = new ArrayList<>();
+
+ while(queue.peek() != null) {
+ String line = queue.poll();
+ List<Object> values = convertLineToTuple(line);
+ if (values == null) {
+ continue;
+ }
+
+ batch.add(values);
+ }
+
+ this.batches.put(batchId, batch);
+ }
+
+ for (List<Object> list : batch) {
+ collector.emit(list);
+ }
+ }
+
+ private List<Object> convertLineToTuple(String line) {
+ return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
+ }
+
+ @Override
+ public void ack(long batchId) {
+ this.batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+ _running = false;
+ readerThread.interrupt();
+ queue.clear();
+
+ closeQuietly(in);
+ closeQuietly(socket);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return scheme.getOutputFields();
+ }
+
+ private class SocketReaderRunnable implements Runnable {
+ public void run() {
+ while (_running) {
+ try {
+ String line = in.readLine();
+ if (line == null) {
+ throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
+ }
+
+ queue.push(line.trim());
+ } catch (Throwable t) {
+ // This spout is added to test purpose, so just failing fast doesn't hurt much
+ die(t);
+ }
+ }
+ }
+ }
+
+ private void die(Throwable t) {
+ LOG.error("Halting process: TridentSocketSpout died.", t);
+ if (_running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
+ System.exit(11);
+ }
+ }
+
+ private void closeQuietly(final Closeable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (final IOException ioe) {
+ // ignore
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
new file mode 100644
index 0000000..3bf1a23
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AvroScheme uses generic(without code generation) instead of specific(with code generation) readers.
+ */
+public class AvroScheme implements Scheme {
+ private final String schemaString;
+ private final List<String> fieldNames;
+ private final CachedSchemas schemas;
+
+ public AvroScheme(String schemaString, List<String> fieldNames) {
+ this.schemaString = schemaString;
+ this.fieldNames = fieldNames;
+ this.schemas = new CachedSchemas();
+ }
+
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ try {
+ Schema schema = schemas.getSchema(schemaString);
+ DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
+ GenericRecord record = reader.read(null, decoder);
+
+ ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+ for (String field : fieldNames) {
+ Object value = record.get(field);
+ // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
+ list.add(SerdeUtils.convertAvroUtf8(value));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fieldNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
new file mode 100644
index 0000000..5dc3393
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * AvroSerializer uses generic(without code generation) instead of specific(with code generation) writers.
+ */
+public class AvroSerializer implements IOutputSerializer, Serializable {
+ private final String schemaString;
+ private final List<String> fieldNames;
+ private final CachedSchemas schemas;
+
+ public AvroSerializer(String schemaString, List<String> fieldNames) {
+ this.schemaString = schemaString;
+ this.fieldNames = fieldNames;
+ this.schemas = new CachedSchemas();
+ }
+
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
+ try {
+ Schema schema = schemas.getSchema(schemaString);
+ GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < fieldNames.size(); i++) {
+ record.put(fieldNames.get(i), data.get(i));
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
+ Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+ writer.write(record, encoder);
+ encoder.flush();
+ byte[] bytes = out.toByteArray();
+ out.close();
+ return ByteBuffer.wrap(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
new file mode 100644
index 0000000..4f0e747
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import org.apache.avro.Schema;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+// TODO this class is reserved for supporting messages with different schemas.
+// current only one schema in the cache
+public class CachedSchemas implements Serializable{
+
+ private final Map<String, Schema> cache = new HashMap<>();
+
+ public Schema getSchema(String schemaString) {
+ Schema schema = cache.get(schemaString);
+ if (schema == null) {
+ schema = new Schema.Parser().parse(schemaString);
+ cache.put(schemaString, schema);
+ }
+ return schema;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
new file mode 100644
index 0000000..34fb1bb
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.csv;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CsvScheme uses the standard RFC4180 CSV Parser
+ * One of the difference from Tsv format is that fields with embedded commas will be quoted.
+ * eg: a,"b,c",d is allowed.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
+ */
+public class CsvScheme implements Scheme {
+ private final List<String> fieldNames;
+
+ public CsvScheme(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ try {
+ String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+ CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
+ CSVRecord record = parser.getRecords().get(0);
+ Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
+
+ ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+ for (int i = 0; i < record.size(); i++) {
+ list.add(record.get(i));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fieldNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
new file mode 100644
index 0000000..0d3bd74
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * CsvSerializer uses the standard RFC4180 CSV Parser
+ * One of the difference from Tsv format is that fields with embedded commas will be quoted.
+ * eg: a,"b,c",d is allowed.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
+ */
+public class CsvSerializer implements IOutputSerializer, Serializable {
+ private final List<String> fields; //reserved for future
+
+ public CsvSerializer(List<String> fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ try {
+ StringWriter writer = new StringWriter();
+ CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
+ for (Object o : data) {
+ printer.print(o);
+ }
+ //since using StringWriter, we do not need to close it.
+ return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
new file mode 100644
index 0000000..d288fa1
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.json;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+ private final List<String> fields;
+
+ public JsonScheme(List<String> fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+ ArrayList<Object> list = new ArrayList<>(fields.size());
+ for (String f : fields) {
+ list.add(map.get(f));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
new file mode 100644
index 0000000..1e825c4
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class JsonSerializer implements IOutputSerializer, Serializable {
+ private final List<String> fieldNames;
+ private final JsonFactory jsonFactory;
+
+ public JsonSerializer(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ jsonFactory = new JsonFactory();
+ }
+
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+ StringWriter sw = new StringWriter();
+ try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+ jg.writeStartObject();
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ jg.writeFieldName(fieldNames.get(i));
+ jg.writeObject(data.get(i));
+ }
+ jg.writeEndObject();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
new file mode 100644
index 0000000..310494c
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.tsv;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TsvScheme uses a simple delimited format implemention by splitting string,
+ * and it supports user defined delimiter.
+ */
+public class TsvScheme implements Scheme {
+ private final List<String> fieldNames;
+ private final char delimiter;
+
+ public TsvScheme(List<String> fieldNames, char delimiter) {
+ this.fieldNames = fieldNames;
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+ List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
+ Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
+
+ ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+ list.addAll(parts);
+ return list;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fieldNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
new file mode 100644
index 0000000..1cf1c76
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.tsv;
+
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * TsvSerializer uses a simple delimited format implemention by splitting string,
+ * and it supports user defined delimiter.
+ */
+public class TsvSerializer implements IOutputSerializer, Serializable {
+ private final List<String> fields; //reserved for future
+ private final char delimiter;
+
+ public TsvSerializer(List<String> fields, char delimiter) {
+ this.fields = fields;
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
+ for (int i = 0; i < data.size(); i++) {
+ Object o = data.get(i);
+ if (i == 0) {
+ sb.append(o);
+ } else {
+ sb.append(delimiter);
+ sb.append(o);
+ }
+ }
+ return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
new file mode 100644
index 0000000..6c76481
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EvaluationCalc implements OperationAwareFlatMapFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
+
+ private final ExecutableExpression filterInstance;
+ private final ExecutableExpression projectionInstance;
+ private final Object[] outputValues;
+ private final DataContext dataContext;
+
+ public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+ this.filterInstance = filterInstance;
+ this.projectionInstance = projectionInstance;
+ this.outputValues = new Object[outputCount];
+ this.dataContext = dataContext;
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+ }
+ if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public Iterable<Values> execute(TridentTuple input) {
+ Context calciteContext = new StormContext(dataContext);
+ calciteContext.values = input.getValues().toArray();
+
+ if (filterInstance != null) {
+ filterInstance.execute(calciteContext, outputValues);
+ // filtered out
+ if (outputValues[0] == null || !((Boolean) outputValues[0])) {
+ return Collections.emptyList();
+ }
+ }
+
+ if (projectionInstance != null) {
+ projectionInstance.execute(calciteContext, outputValues);
+ return Collections.singletonList(new Values(outputValues));
+ } else {
+ return Collections.singletonList(new Values(input.getValues()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
new file mode 100644
index 0000000..9314852
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvaluationFilter extends BaseFilter {
+ private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
+
+ private final ExecutableExpression filterInstance;
+ private final DataContext dataContext;
+ private final Object[] outputValues;
+
+ public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
+ this.filterInstance = filterInstance;
+ this.dataContext = dataContext;
+ this.outputValues = new Object[1];
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+ }
+ }
+
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ Context calciteContext = new StormContext(dataContext);
+ calciteContext.values = tuple.getValues().toArray();
+ filterInstance.execute(calciteContext, outputValues);
+ return (outputValues[0] != null && (boolean) outputValues[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
new file mode 100644
index 0000000..2608104
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.trident.operation.OperationAwareMapFunction;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvaluationFunction implements OperationAwareMapFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
+
+ private final ExecutableExpression projectionInstance;
+ private final Object[] outputValues;
+ private final DataContext dataContext;
+
+ public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+ this.projectionInstance = projectionInstance;
+ this.outputValues = new Object[outputCount];
+ this.dataContext = dataContext;
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ if (projectionInstance instanceof DebuggableExecutableExpression) {
+ LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public Values execute(TridentTuple input) {
+ Context calciteContext = new StormContext(dataContext);
+ calciteContext.values = input.getValues().toArray();
+ projectionInstance.execute(calciteContext, outputValues);
+ return new Values(outputValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
new file mode 100644
index 0000000..4c3a266
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ForwardFunction extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ collector.emit(tuple.getValues());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
new file mode 100644
index 0000000..efd5d25
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.utils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.storm.sql.runtime.FieldInfo;
+
+import java.io.Serializable;
+import java.util.List;
+
+public final class FieldInfoUtils {
+
+ public static List<String> getFieldNames(List<FieldInfo> fields) {
+ return Lists.transform(fields, new FieldNameExtractor());
+ }
+
+ private static class FieldNameExtractor implements Function<FieldInfo, String>, Serializable {
+ @Override
+ public String apply(FieldInfo fieldInfo) {
+ return fieldInfo.name();
+ }
+ }
+}