You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/07/16 11:13:18 UTC
[1/2] storm git commit: STORM-2634: Apply new code style to
storm-sql-runtime
Repository: storm
Updated Branches:
refs/heads/master 2fc414d00 -> 72e219c15
STORM-2634: Apply new code style to storm-sql-runtime
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2a36ec7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2a36ec7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2a36ec7b
Branch: refs/heads/master
Commit: 2a36ec7bbe88d61685886b9de291b7fcc027b899
Parents: d7c7818
Author: Xin Wang <be...@163.com>
Authored: Sat Jul 15 16:10:39 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sat Jul 15 16:10:39 2017 +0800
----------------------------------------------------------------------
sql/storm-sql-runtime/pom.xml | 3 -
.../calcite/interpreter/StormContext.java | 5 +-
.../sql/runtime/AbstractChannelHandler.java | 43 +++---
.../sql/runtime/AbstractValuesProcessor.java | 23 +--
.../storm/sql/runtime/ChannelContext.java | 18 ++-
.../storm/sql/runtime/ChannelHandler.java | 21 +--
.../org/apache/storm/sql/runtime/Channels.java | 149 ++++++++++---------
.../apache/storm/sql/runtime/DataSource.java | 3 +-
.../storm/sql/runtime/DataSourcesProvider.java | 42 +++---
.../storm/sql/runtime/DataSourcesRegistry.java | 96 +++++++-----
.../org/apache/storm/sql/runtime/FieldInfo.java | 43 +++---
.../storm/sql/runtime/IOutputSerializer.java | 15 +-
.../sql/runtime/ISqlTridentDataSource.java | 72 ++++-----
.../sql/runtime/SimpleSqlTridentConsumer.java | 1 +
.../storm/sql/runtime/StormSqlFunctions.java | 34 +++--
.../runtime/calcite/ExecutableExpression.java | 5 +-
.../sql/runtime/calcite/StormDataContext.java | 13 +-
.../socket/SocketDataSourcesProvider.java | 13 +-
.../datasource/socket/trident/SocketState.java | 12 +-
.../socket/trident/SocketStateUpdater.java | 6 +-
.../socket/trident/TridentSocketSpout.java | 33 ++--
.../sql/runtime/serde/avro/AvroScheme.java | 74 ++++-----
.../sql/runtime/serde/avro/AvroSerializer.java | 75 +++++-----
.../sql/runtime/serde/avro/CachedSchemas.java | 12 +-
.../storm/sql/runtime/serde/csv/CsvScheme.java | 60 ++++----
.../sql/runtime/serde/csv/CsvSerializer.java | 39 ++---
.../sql/runtime/serde/json/JsonScheme.java | 56 +++----
.../sql/runtime/serde/json/JsonSerializer.java | 46 +++---
.../storm/sql/runtime/serde/tsv/TsvScheme.java | 54 +++----
.../sql/runtime/serde/tsv/TsvSerializer.java | 41 ++---
.../trident/functions/EvaluationCalc.java | 17 ++-
.../trident/functions/EvaluationFilter.java | 10 +-
.../trident/functions/EvaluationFunction.java | 11 +-
.../trident/functions/ForwardFunction.java | 1 +
.../storm/sql/runtime/utils/FieldInfoUtils.java | 4 +-
.../storm/sql/runtime/utils/SerdeUtils.java | 41 +++--
.../apache/storm/sql/runtime/utils/Utils.java | 21 +--
37 files changed, 677 insertions(+), 535 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/pom.xml b/sql/storm-sql-runtime/pom.xml
index 511c792..f9969b2 100644
--- a/sql/storm-sql-runtime/pom.xml
+++ b/sql/storm-sql-runtime/pom.xml
@@ -135,9 +135,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
- <configuration>
- <maxAllowedViolations>485</maxAllowedViolations>
- </configuration>
</plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
index aa7e435..700284c 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.calcite.interpreter;
-import org.apache.calcite.DataContext;
+package org.apache.calcite.interpreter;
import java.io.Serializable;
+import org.apache.calcite.DataContext;
+
/**
* This is a hack to use Calcite Context.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
index 64be39d..effdf55 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -15,38 +15,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import org.apache.storm.tuple.Values;
public abstract class AbstractChannelHandler implements ChannelHandler {
- @Override
- public abstract void dataReceived(ChannelContext ctx, Values data);
-
- @Override
- public void channelInactive(ChannelContext ctx) {
-
- }
+ @Override
+ public abstract void dataReceived(ChannelContext ctx, Values data);
- @Override
- public void exceptionCaught(Throwable cause) {
+ @Override
+ public void channelInactive(ChannelContext ctx) {
- }
+ }
- @Override
- public void flush(ChannelContext ctx) {
- ctx.flush();
- }
+ @Override
+ public void exceptionCaught(Throwable cause) {
- @Override
- public void setSource(ChannelContext ctx, Object source) {
+ }
- }
+ @Override
+ public void flush(ChannelContext ctx) {
+ ctx.flush();
+ }
- public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
@Override
- public void dataReceived(ChannelContext ctx, Values data) {
- ctx.emit(data);
+ public void setSource(ChannelContext ctx, Object source) {
+
}
- };
+
+ public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+ @Override
+ public void dataReceived(ChannelContext ctx, Values data) {
+ ctx.emit(data);
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
index 6a853be..1449325 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.runtime;
-import org.apache.storm.tuple.Values;
+package org.apache.storm.sql.runtime;
import java.util.Map;
+import org.apache.storm.tuple.Values;
+
/**
* Subclass of AbstractTupleProcessor provides a series of tuple. It
* takes a series of iterators of {@link Values} and produces a stream of
* tuple.
- *
+ * <p/>
* The subclass implements the {@see next()} method to provide
* the output of the stream. It can choose to return null in {@see next()} to
* indicate that this particular iteration is a no-op. SQL processors depend
@@ -33,12 +34,12 @@ import java.util.Map;
*/
public abstract class AbstractValuesProcessor {
- /**
- * Initialize the data sources.
- *
- * @param data a map from the table name to the iterators of the values.
- *
- */
- public abstract void initialize(Map<String, DataSource> data, ChannelHandler
- result);
+ /**
+ * Initialize the data sources.
+ *
+ * @param data a map from the table name to the iterators of the values.
+ *
+ */
+ public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+ result);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
index 65ad01c..9d6f662 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -15,16 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import org.apache.storm.tuple.Values;
public interface ChannelContext {
- /**
- * Emit data to the next stage of the data pipeline.
- */
- void emit(Values data);
- void fireChannelInactive();
- void flush();
- void setSource(Object source);
+ /**
+ * Emit data to the next stage of the data pipeline.
+ */
+ void emit(Values data);
+
+ void fireChannelInactive();
+
+ void flush();
+
+ void setSource(Object source);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
index af02b7e..3009df4 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import org.apache.storm.tuple.Values;
@@ -24,18 +25,18 @@ import org.apache.storm.tuple.Values;
* series of events.
*/
public interface ChannelHandler {
- void dataReceived(ChannelContext ctx, Values data);
+ void dataReceived(ChannelContext ctx, Values data);
- /**
- * The producer of the data has indicated that the channel is no longer
- * active.
- * @param ctx
- */
- void channelInactive(ChannelContext ctx);
+ /**
+ * The producer of the data has indicated that the channel is no longer
+ * active.
+ * @param ctx ChannelContext
+ */
+ void channelInactive(ChannelContext ctx);
- void exceptionCaught(Throwable cause);
+ void exceptionCaught(Throwable cause);
- void flush(ChannelContext ctx);
+ void flush(ChannelContext ctx);
- void setSource(ChannelContext ctx, Object source);
+ void setSource(ChannelContext ctx, Object source);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
index 3b5eedd..d389c38 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -15,95 +15,96 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import org.apache.storm.tuple.Values;
public class Channels {
- private static final ChannelContext VOID_CTX = new ChannelContext() {
- @Override
- public void emit(Values data) {}
-
- @Override
- public void fireChannelInactive() {}
-
- @Override
- public void flush() {
-
- }
-
- @Override
- public void setSource(java.lang.Object source) {
-
- }
- };
-
- private static class ChannelContextAdapter implements ChannelContext {
- private final ChannelHandler handler;
- private final ChannelContext next;
-
- public ChannelContextAdapter(
- ChannelContext next, ChannelHandler handler) {
- this.handler = handler;
- this.next = next;
+ private static final ChannelContext VOID_CTX = new ChannelContext() {
+ @Override
+ public void emit(Values data) {}
+
+ @Override
+ public void fireChannelInactive() {}
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void setSource(java.lang.Object source) {
+
+ }
+ };
+
+ private static class ChannelContextAdapter implements ChannelContext {
+ private final ChannelHandler handler;
+ private final ChannelContext next;
+
+ public ChannelContextAdapter(
+ ChannelContext next, ChannelHandler handler) {
+ this.handler = handler;
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ handler.dataReceived(next, data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ handler.channelInactive(next);
+ }
+
+ @Override
+ public void flush() {
+ handler.flush(next);
+ }
+
+ @Override
+ public void setSource(java.lang.Object source) {
+ handler.setSource(next, source);
+ next.setSource(source); // propagate through the chain
+ }
}
- @Override
- public void emit(Values data) {
- handler.dataReceived(next, data);
- }
-
- @Override
- public void fireChannelInactive() {
- handler.channelInactive(next);
- }
-
- @Override
- public void flush() {
- handler.flush(next);
- }
+ private static class ForwardingChannelContext implements ChannelContext {
+ private final ChannelContext next;
- @Override
- public void setSource(java.lang.Object source) {
- handler.setSource(next, source);
- next.setSource(source); // propagate through the chain
- }
- }
+ public ForwardingChannelContext(ChannelContext next) {
+ this.next = next;
+ }
- private static class ForwardingChannelContext implements ChannelContext {
- private final ChannelContext next;
+ @Override
+ public void emit(Values data) {
+ next.emit(data);
+ }
- public ForwardingChannelContext(ChannelContext next) {
- this.next = next;
- }
+ @Override
+ public void fireChannelInactive() {
+ next.fireChannelInactive();
+ }
- @Override
- public void emit(Values data) {
- next.emit(data);
- }
+ @Override
+ public void flush() {
+ next.flush();
+ }
- @Override
- public void fireChannelInactive() {
- next.fireChannelInactive();
+ @Override
+ public void setSource(Object source) {
+ next.setSource(source);
+ }
}
- @Override
- public void flush() {
- next.flush();
+ public static ChannelContext chain(
+ ChannelContext next, ChannelHandler handler) {
+ return new ChannelContextAdapter(next, handler);
}
- @Override
- public void setSource(Object source) {
- next.setSource(source);
+ public static ChannelContext voidContext() {
+ return VOID_CTX;
}
- }
-
- public static ChannelContext chain(
- ChannelContext next, ChannelHandler handler) {
- return new ChannelContextAdapter(next, handler);
- }
-
- public static ChannelContext voidContext() {
- return VOID_CTX;
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
index 352af73..b90b4f5 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
/**
@@ -23,5 +24,5 @@ package org.apache.storm.sql.runtime;
*
*/
public interface DataSource {
- void open(ChannelContext ctx);
+ void open(ChannelContext ctx);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
index dbece9c..baec6cd 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import java.net.URI;
@@ -22,26 +23,27 @@ import java.util.List;
import java.util.Properties;
public interface DataSourcesProvider {
- /**
- * @return the scheme of the data source
- */
- String scheme();
+ /**
+ * Get the scheme of the data source.
+ * @return the scheme of the data source
+ */
+ String scheme();
- /**
- * Construct a new data source.
- * @param uri The URI that specifies the data source. The format of the URI
- * is fully customizable.
- * @param inputFormatClass the name of the class that deserializes data.
- * It is null when unspecified.
- * @param outputFormatClass the name of the class that serializes data. It
- * is null when unspecified.
- * @param fields The name of the fields and the schema of the table.
- */
- DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields);
+ /**
+ * Construct a new data source.
+ * @param uri The URI that specifies the data source. The format of the URI
+ * is fully customizable.
+ * @param inputFormatClass the name of the class that deserializes data.
+ * It is null when unspecified.
+ * @param outputFormatClass the name of the class that serializes data. It
+ * is null when unspecified.
+ * @param fields The name of the fields and the schema of the table.
+ */
+ DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields);
- ISqlTridentDataSource constructTrident(
- URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields);
+ ISqlTridentDataSource constructTrident(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index dfefb01..9d8368a 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -15,10 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.runtime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.sql.runtime;
import java.net.URI;
import java.util.HashMap;
@@ -27,50 +25,70 @@ import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class DataSourcesRegistry {
- private static final Logger LOG = LoggerFactory.getLogger(
- DataSourcesRegistry.class);
- private static final Map<String, DataSourcesProvider> providers;
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DataSourcesRegistry.class);
+ private static final Map<String, DataSourcesProvider> providers;
- static {
- providers = new HashMap<>();
- ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
- DataSourcesProvider.class);
- for (DataSourcesProvider p : loader) {
- LOG.info("Registering scheme {} with {}", p.scheme(), p);
- providers.put(p.scheme(), p);
+ static {
+ providers = new HashMap<>();
+ ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+ DataSourcesProvider.class);
+ for (DataSourcesProvider p : loader) {
+ LOG.info("Registering scheme {} with {}", p.scheme(), p);
+ providers.put(p.scheme(), p);
+ }
}
- }
-
- private DataSourcesRegistry() {
- }
- public static DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- DataSourcesProvider provider = providers.get(uri.getScheme());
- if (provider == null) {
- return null;
+ private DataSourcesRegistry() {
}
- return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
- }
+ /**
+ * Construct a data source.
+ * @param uri data source uri
+ * @param inputFormatClass input format class
+ * @param outputFormatClass output format class
+ * @param fields fields info list
+ * @return DataSource object
+ */
+ public static DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ DataSourcesProvider provider = providers.get(uri.getScheme());
+ if (provider == null) {
+ return null;
+ }
- public static ISqlTridentDataSource constructTridentDataSource(
- URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- DataSourcesProvider provider = providers.get(uri.getScheme());
- if (provider == null) {
- return null;
+ return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
}
- return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
- }
+ /**
+ * Construct a trident data source.
+ * @param uri data source uri
+ * @param inputFormatClass input format class
+ * @param outputFormatClass output format class
+ * @param properties Properties
+ * @param fields fields info list
+ * @return TridentDataSource object
+ */
+ public static ISqlTridentDataSource constructTridentDataSource(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ DataSourcesProvider provider = providers.get(uri.getScheme());
+ if (provider == null) {
+ return null;
+ }
- /**
- * Allow unit tests to inject data sources.
- */
- public static Map<String, DataSourcesProvider> providerMap() {
- return providers;
- }
+ return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
+ }
+
+ /**
+ * Allow unit tests to inject data sources.
+ */
+ public static Map<String, DataSourcesProvider> providerMap() {
+ return providers;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
index 03b030b..c547851 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
@@ -15,33 +15,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import java.io.Serializable;
/**
- * Describe each column of the field
+ * Describe each column of the field.
*/
public class FieldInfo implements Serializable {
- private final String name;
- private final Class<?> type;
- private final boolean isPrimary;
+ private final String name;
+ private final Class<?> type;
+ private final boolean isPrimary;
- public FieldInfo(String name, Class<?> type, boolean isPrimary) {
- this.name = name;
- this.type = type;
- this.isPrimary = isPrimary;
- }
+ /**
+ * FieldInfo Constructor.
+ * @param name field name
+ * @param type filed type
+ * @param isPrimary primary or not
+ */
+ public FieldInfo(String name, Class<?> type, boolean isPrimary) {
+ this.name = name;
+ this.type = type;
+ this.isPrimary = isPrimary;
+ }
- public String name() {
- return name;
- }
+ public String name() {
+ return name;
+ }
- public Class<?> type() {
- return type;
- }
+ public Class<?> type() {
+ return type;
+ }
- public boolean isPrimary() {
- return isPrimary;
- }
+ public boolean isPrimary() {
+ return isPrimary;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
index b6670d9..46d56b7 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import java.nio.ByteBuffer;
import java.util.List;
public interface IOutputSerializer {
- /**
- * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
- * memory.
- *
- * @return A ByteBuffer contains the serialized result.
- */
- ByteBuffer write(List<Object> data, ByteBuffer buffer);
+ /**
+ * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+ * memory.
+ *
+ * @return A ByteBuffer contains the serialized result.
+ */
+ ByteBuffer write(List<Object> data, ByteBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
index 9eae5ae..81fb386 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import org.apache.storm.trident.spout.ITridentDataSource;
@@ -25,41 +26,42 @@ import org.apache.storm.trident.state.StateUpdater;
* A ISqlTridentDataSource specifies how an external data source produces and consumes data.
*/
public interface ISqlTridentDataSource {
- /**
- * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
- *
- * Please note that StateFactory and StateUpdater should use same class which implements State.
- *
- * @see org.apache.storm.trident.state.StateFactory
- * @see org.apache.storm.trident.state.StateUpdater
- */
- interface SqlTridentConsumer {
- StateFactory getStateFactory();
- StateUpdater getStateUpdater();
- }
+ /**
+ * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
+ * <p/>
+ * Please note that StateFactory and StateUpdater should use same class which implements State.
+ *
+ * @see org.apache.storm.trident.state.StateFactory
+ * @see org.apache.storm.trident.state.StateUpdater
+ */
+ interface SqlTridentConsumer {
+ StateFactory getStateFactory();
+
+ StateUpdater getStateUpdater();
+ }
- /**
- * Provides instance of ITridentDataSource which can be used as producer in Trident.
- *
- * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
- * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
- * - IBatchSpout
- * - ITridentSpout
- * - IPartitionedTridentSpout
- * - IOpaquePartitionedTridentSpout
- *
- * @see org.apache.storm.trident.spout.ITridentDataSource
- * @see org.apache.storm.trident.spout.IBatchSpout
- * @see org.apache.storm.trident.spout.ITridentSpout
- * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
- * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
- */
- ITridentDataSource getProducer();
+ /**
+ * Provides instance of ITridentDataSource which can be used as producer in Trident.
+ * <p/>
+ * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
+ * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
+ * - IBatchSpout
+ * - ITridentSpout
+ * - IPartitionedTridentSpout
+ * - IOpaquePartitionedTridentSpout
+ *
+ * @see org.apache.storm.trident.spout.ITridentDataSource
+ * @see org.apache.storm.trident.spout.IBatchSpout
+ * @see org.apache.storm.trident.spout.ITridentSpout
+ * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
+ * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
+ */
+ ITridentDataSource getProducer();
- /**
- * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
- *
- * @see SqlTridentConsumer
- */
- SqlTridentConsumer getConsumer();
+ /**
+ * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
+ *
+ * @see SqlTridentConsumer
+ */
+ SqlTridentConsumer getConsumer();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
index c9abd16..f9a6039 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
import org.apache.storm.trident.state.StateFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
index a373483..095e6ba 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -15,20 +15,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime;
public class StormSqlFunctions {
- public static Boolean eq(Object b0, Object b1) {
- if (b0 == null || b1 == null) {
- return null;
+
+ /**
+ * Whether the object equals the other one.
+ * @param b0 one object
+ * @param b1 the other object
+ * @return true if the object equals the other one
+ */
+ public static Boolean eq(Object b0, Object b1) {
+ if (b0 == null || b1 == null) {
+ return null;
+ }
+ return b0.equals(b1);
}
- return b0.equals(b1);
- }
- public static Boolean ne(Object b0, Object b1) {
- if (b0 == null || b1 == null) {
- return null;
+ /**
+ * Whether the object dose not equals the other one.
+ * @param b0 one object
+ * @param b1 the other object
+ * @return true if the object dose not equals the other one
+ */
+ public static Boolean ne(Object b0, Object b1) {
+ if (b0 == null || b1 == null) {
+ return null;
+ }
+ return !b0.equals(b1);
}
- return !b0.equals(b1);
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
index 8416945..699a056 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
@@ -18,14 +18,15 @@
package org.apache.storm.sql.runtime.calcite;
-import org.apache.calcite.interpreter.Context;
-
import java.io.Serializable;
+import org.apache.calcite.interpreter.Context;
+
/**
* Compiled executable expression.
*/
public interface ExecutableExpression extends Serializable {
Object execute(Context context);
+
void execute(Context context, Object[] results);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
index 4861b43..6f506cc 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
@@ -15,9 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.calcite;
import com.google.common.collect.ImmutableMap;
+
+import java.io.Serializable;
+import java.util.Calendar;
+import java.util.TimeZone;
+
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.QueryProvider;
@@ -25,16 +31,15 @@ import org.apache.calcite.runtime.Hook;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.Holder;
-import java.io.Serializable;
-import java.util.Calendar;
-import java.util.TimeZone;
-
/**
* This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite.
*/
public class StormDataContext implements DataContext, Serializable {
private final ImmutableMap<Object, Object> map;
+ /**
+ * StormDataContext Constructor.
+ */
public StormDataContext() {
// Store the time at which the query started executing. The SQL
// standard says that functions such as CURRENT_TIMESTAMP return the
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
index 0e65220..fe4d024 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -18,6 +18,10 @@
package org.apache.storm.sql.runtime.datasource.socket;
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.storm.spout.Scheme;
import org.apache.storm.sql.runtime.DataSource;
import org.apache.storm.sql.runtime.DataSourcesProvider;
@@ -34,14 +38,10 @@ import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateUpdater;
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
/**
* Create a Socket data source based on the URI and properties. The URI has the format of
* socket://[host]:[port]. Both of host and port are mandatory.
- *
+ * <p/>
* Note that it connects to given host and port, and receive the message if it's used for input source,
* and send the message if it's used for output data source.
*/
@@ -84,7 +84,8 @@ public class SocketDataSourcesProvider implements DataSourcesProvider {
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
String host = uri.getHost();
int port = uri.getPort();
if (port == -1) {
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
index c12f8bd..60c1a5e 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
@@ -18,16 +18,16 @@
package org.apache.storm.sql.runtime.datasource.socket.trident;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.Map;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
/**
* Trident State implementation of Socket. It only supports writing.
*/
@@ -64,8 +64,8 @@ public class SocketState implements State {
Socket socket = new Socket(host, port);
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
} catch (IOException e) {
- throw new RuntimeException("Exception while initializing socket for State. host " +
- host + " port " + port, e);
+ throw new RuntimeException("Exception while initializing socket for State. host "
+ + host + " port " + port, e);
}
// State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
index 3062a90..df2893b 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
@@ -18,6 +18,9 @@
package org.apache.storm.sql.runtime.datasource.socket.trident;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.BaseStateUpdater;
@@ -25,9 +28,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-
/**
* StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
index 1bd9251..aad42fb 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
@@ -19,14 +19,6 @@
package org.apache.storm.sql.runtime.datasource.socket.trident;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.storm.Config;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.Closeable;
@@ -41,6 +33,15 @@ import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.storm.Config;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
*/
@@ -51,7 +52,7 @@ public class TridentSocketSpout implements IBatchSpout {
private final int port;
private final Scheme scheme;
- private volatile boolean _running = true;
+ private volatile boolean running = true;
private BlockingDeque<String> queue;
private Socket socket;
@@ -61,6 +62,12 @@ public class TridentSocketSpout implements IBatchSpout {
private Map<Long, List<List<Object>>> batches;
+ /**
+ * TridentSocketSpout Constructor.
+ * @param scheme Scheme
+ * @param host socket host
+ * @param port socket port
+ */
public TridentSocketSpout(Scheme scheme, String host, int port) {
this.scheme = scheme;
this.host = host;
@@ -91,7 +98,7 @@ public class TridentSocketSpout implements IBatchSpout {
if (batch == null) {
batch = new ArrayList<>();
- while(queue.peek() != null) {
+ while (queue.peek() != null) {
String line = queue.poll();
List<Object> values = convertLineToTuple(line);
if (values == null) {
@@ -120,7 +127,7 @@ public class TridentSocketSpout implements IBatchSpout {
@Override
public void close() {
- _running = false;
+ running = false;
readerThread.interrupt();
queue.clear();
@@ -142,7 +149,7 @@ public class TridentSocketSpout implements IBatchSpout {
private class SocketReaderRunnable implements Runnable {
public void run() {
- while (_running) {
+ while (running) {
try {
String line = in.readLine();
if (line == null) {
@@ -160,7 +167,7 @@ public class TridentSocketSpout implements IBatchSpout {
private void die(Throwable t) {
LOG.error("Halting process: TridentSocketSpout died.", t);
- if (_running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
+ if (running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
System.exit(11);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
index 3bf1a23..147afe6 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
@@ -15,8 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.serde.avro;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -28,47 +34,47 @@ import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* AvroScheme uses generic(without code generation) instead of specific(with code generation) readers.
*/
public class AvroScheme implements Scheme {
- private final String schemaString;
- private final List<String> fieldNames;
- private final CachedSchemas schemas;
+ private final String schemaString;
+ private final List<String> fieldNames;
+ private final CachedSchemas schemas;
- public AvroScheme(String schemaString, List<String> fieldNames) {
- this.schemaString = schemaString;
- this.fieldNames = fieldNames;
- this.schemas = new CachedSchemas();
- }
+ /**
+ * AvroScheme Constructor.
+ * @param schemaString schema string
+ * @param fieldNames field names
+ */
+ public AvroScheme(String schemaString, List<String> fieldNames) {
+ this.schemaString = schemaString;
+ this.fieldNames = fieldNames;
+ this.schemas = new CachedSchemas();
+ }
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- try {
- Schema schema = schemas.getSchema(schemaString);
- DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
- GenericRecord record = reader.read(null, decoder);
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ try {
+ Schema schema = schemas.getSchema(schemaString);
+ DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
+ GenericRecord record = reader.read(null, decoder);
- ArrayList<Object> list = new ArrayList<>(fieldNames.size());
- for (String field : fieldNames) {
- Object value = record.get(field);
- // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
- list.add(SerdeUtils.convertAvroUtf8(value));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
+ ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+ for (String field : fieldNames) {
+ Object value = record.get(field);
+ // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
+ list.add(SerdeUtils.convertAvroUtf8(value));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- @Override
- public Fields getOutputFields() {
- return new Fields(fieldNames);
- }
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fieldNames);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
index 5dc3393..6e4204e 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
@@ -15,9 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.serde.avro;
import com.google.common.base.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -27,46 +35,45 @@ import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.storm.sql.runtime.IOutputSerializer;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
/**
* AvroSerializer uses generic(without code generation) instead of specific(with code generation) writers.
*/
public class AvroSerializer implements IOutputSerializer, Serializable {
- private final String schemaString;
- private final List<String> fieldNames;
- private final CachedSchemas schemas;
+ private final String schemaString;
+ private final List<String> fieldNames;
+ private final CachedSchemas schemas;
- public AvroSerializer(String schemaString, List<String> fieldNames) {
- this.schemaString = schemaString;
- this.fieldNames = fieldNames;
- this.schemas = new CachedSchemas();
- }
+ /**
+ * AvroSerializer Constructor.
+ * @param schemaString schema string
+ * @param fieldNames field names
+ */
+ public AvroSerializer(String schemaString, List<String> fieldNames) {
+ this.schemaString = schemaString;
+ this.fieldNames = fieldNames;
+ this.schemas = new CachedSchemas();
+ }
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
- try {
- Schema schema = schemas.getSchema(schemaString);
- GenericRecord record = new GenericData.Record(schema);
- for (int i = 0; i < fieldNames.size(); i++) {
- record.put(fieldNames.get(i), data.get(i));
- }
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
+ try {
+ Schema schema = schemas.getSchema(schemaString);
+ GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < fieldNames.size(); i++) {
+ record.put(fieldNames.get(i), data.get(i));
+ }
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
- Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
- writer.write(record, encoder);
- encoder.flush();
- byte[] bytes = out.toByteArray();
- out.close();
- return ByteBuffer.wrap(bytes);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
+ Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+ writer.write(record, encoder);
+ encoder.flush();
+ byte[] bytes = out.toByteArray();
+ out.close();
+ return ByteBuffer.wrap(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
index fae917b..28f6e86 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
@@ -15,20 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.runtime.serde.avro;
-import org.apache.avro.Schema;
+package org.apache.storm.sql.runtime.serde.avro;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.avro.Schema;
+
// TODO this class is reserved for supporting messages with different schemas.
// current only one schema in the cache
-public class CachedSchemas implements Serializable{
+public class CachedSchemas implements Serializable {
private final Map<String, Schema> cache = new HashMap<>();
+ /**
+ * Get a schema based on schema string.
+ * @param schemaString schema string
+ * @return Schema object
+ */
public Schema getSchema(String schemaString) {
Schema schema = cache.get(schemaString);
if (schema == null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
index 34fb1bb..1bf3d4d 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
@@ -15,15 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.serde.csv;
import com.google.common.base.Preconditions;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -31,6 +26,13 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
/**
* CsvScheme uses the standard RFC4180 CSV Parser
* One of the difference from Tsv format is that fields with embedded commas will be quoted.
@@ -39,32 +41,32 @@ import java.util.List;
* @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
*/
public class CsvScheme implements Scheme {
- private final List<String> fieldNames;
+ private final List<String> fieldNames;
- public CsvScheme(List<String> fieldNames) {
- this.fieldNames = fieldNames;
- }
+ public CsvScheme(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ }
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- try {
- String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
- CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
- CSVRecord record = parser.getRecords().get(0);
- Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ try {
+ String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+ CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
+ CSVRecord record = parser.getRecords().get(0);
+ Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
- ArrayList<Object> list = new ArrayList<>(fieldNames.size());
- for (int i = 0; i < record.size(); i++) {
- list.add(record.get(i));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
+ ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+ for (int i = 0; i < record.size(); i++) {
+ list.add(record.get(i));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- @Override
- public Fields getOutputFields() {
- return new Fields(fieldNames);
- }
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fieldNames);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
index 0d3bd74..cbcdb3c 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
@@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.runtime.serde.csv;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.storm.sql.runtime.IOutputSerializer;
+package org.apache.storm.sql.runtime.serde.csv;
import java.io.IOException;
import java.io.Serializable;
@@ -28,6 +25,10 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
/**
* CsvSerializer uses the standard RFC4180 CSV Parser
* One of the difference from Tsv format is that fields with embedded commas will be quoted.
@@ -36,24 +37,24 @@ import java.util.List;
* @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
*/
public class CsvSerializer implements IOutputSerializer, Serializable {
- private final List<String> fields; //reserved for future
+ private final List<String> fields; //reserved for future
- public CsvSerializer(List<String> fields) {
+ public CsvSerializer(List<String> fields) {
this.fields = fields;
}
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- try {
- StringWriter writer = new StringWriter();
- CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
- for (Object o : data) {
- printer.print(o);
- }
- //since using StringWriter, we do not need to close it.
- return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
- } catch (IOException e) {
- throw new RuntimeException(e);
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ try {
+ StringWriter writer = new StringWriter();
+ CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
+ for (Object o : data) {
+ printer.print(o);
+ }
+ //since using StringWriter, we do not need to close it.
+ return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
index d288fa1..db202fc 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
@@ -15,11 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.serde.json;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
@@ -28,31 +26,35 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
public class JsonScheme implements Scheme {
- private final List<String> fields;
-
- public JsonScheme(List<String> fields) {
- this.fields = fields;
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- @SuppressWarnings("unchecked")
- HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
- ArrayList<Object> list = new ArrayList<>(fields.size());
- for (String f : fields) {
- list.add(map.get(f));
- }
- return list;
- } catch (IOException e) {
- throw new RuntimeException(e);
+ private final List<String> fields;
+
+ public JsonScheme(List<String> fields) {
+ this.fields = fields;
}
- }
- @Override
- public Fields getOutputFields() {
- return new Fields(fields);
- }
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+ ArrayList<Object> list = new ArrayList<>(fields.size());
+ for (String f : fields) {
+ list.add(map.get(f));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fields);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
index 1e825c4..0f40fdf 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.serde.json;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.IOutputSerializer;
import java.io.IOException;
import java.io.Serializable;
@@ -29,29 +29,31 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
public class JsonSerializer implements IOutputSerializer, Serializable {
- private final List<String> fieldNames;
- private final JsonFactory jsonFactory;
+ private final List<String> fieldNames;
+ private final JsonFactory jsonFactory;
- public JsonSerializer(List<String> fieldNames) {
- this.fieldNames = fieldNames;
- jsonFactory = new JsonFactory();
- }
+ public JsonSerializer(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ jsonFactory = new JsonFactory();
+ }
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
- StringWriter sw = new StringWriter();
- try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
- jg.writeStartObject();
- for (int i = 0; i < fieldNames.size(); ++i) {
- jg.writeFieldName(fieldNames.get(i));
- jg.writeObject(data.get(i));
- }
- jg.writeEndObject();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+ StringWriter sw = new StringWriter();
+ try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+ jg.writeStartObject();
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ jg.writeFieldName(fieldNames.get(i));
+ jg.writeObject(data.get(i));
+ }
+ jg.writeEndObject();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
}
- return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
index 310494c..7d9a896 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
@@ -15,44 +15,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.serde.tsv;
import com.google.common.base.Preconditions;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
/**
* TsvScheme uses a simple delimited format implemention by splitting string,
* and it supports user defined delimiter.
*/
public class TsvScheme implements Scheme {
- private final List<String> fieldNames;
- private final char delimiter;
-
- public TsvScheme(List<String> fieldNames, char delimiter) {
- this.fieldNames = fieldNames;
- this.delimiter = delimiter;
- }
-
- @Override
- public List<Object> deserialize(ByteBuffer ser) {
- String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
- List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
- Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
-
- ArrayList<Object> list = new ArrayList<>(fieldNames.size());
- list.addAll(parts);
- return list;
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(fieldNames);
- }
+ private final List<String> fieldNames;
+ private final char delimiter;
+
+ public TsvScheme(List<String> fieldNames, char delimiter) {
+ this.fieldNames = fieldNames;
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public List<Object> deserialize(ByteBuffer ser) {
+ String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+ List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
+ Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
+
+ ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+ list.addAll(parts);
+ return list;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(fieldNames);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
index 1cf1c76..d5b41ac 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
@@ -15,40 +15,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.runtime.serde.tsv;
-import org.apache.storm.sql.runtime.IOutputSerializer;
+package org.apache.storm.sql.runtime.serde.tsv;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
/**
* TsvSerializer uses a simple delimited format implemention by splitting string,
* and it supports user defined delimiter.
*/
public class TsvSerializer implements IOutputSerializer, Serializable {
- private final List<String> fields; //reserved for future
- private final char delimiter;
+ private final List<String> fields; //reserved for future
+ private final char delimiter;
- public TsvSerializer(List<String> fields, char delimiter) {
- this.fields = fields;
- this.delimiter = delimiter;
+ public TsvSerializer(List<String> fields, char delimiter) {
+ this.fields = fields;
+ this.delimiter = delimiter;
}
- @Override
- public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
- StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
- for (int i = 0; i < data.size(); i++) {
- Object o = data.get(i);
- if (i == 0) {
- sb.append(o);
- } else {
- sb.append(delimiter);
- sb.append(o);
- }
+ @Override
+ public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+ StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
+ for (int i = 0; i < data.size(); i++) {
+ Object o = data.get(i);
+ if (i == 0) {
+ sb.append(o);
+ } else {
+ sb.append(delimiter);
+ sb.append(o);
+ }
+ }
+ return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
}
- return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
index 0318455..1b2f250 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
@@ -16,8 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.storm.sql.runtime.trident.functions;
+import java.util.Collections;
+import java.util.Map;
+
import org.apache.calcite.DataContext;
import org.apache.calcite.interpreter.Context;
import org.apache.calcite.interpreter.StormContext;
@@ -30,9 +34,6 @@ import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.Map;
-
public class EvaluationCalc implements OperationAwareFlatMapFunction {
private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
@@ -41,7 +42,15 @@ public class EvaluationCalc implements OperationAwareFlatMapFunction {
private final Object[] outputValues;
private final DataContext dataContext;
- public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+ /**
+ * EvaluationCalc Constructor.
+ * @param filterInstance ExecutableExpression
+ * @param projectionInstance ExecutableExpression
+ * @param outputCount output count
+ * @param dataContext DataContext
+ */
+ public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance,
+ int outputCount, DataContext dataContext) {
this.filterInstance = filterInstance;
this.projectionInstance = projectionInstance;
this.outputValues = new Object[outputCount];
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
index 81e81aa..83e187c 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -16,8 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.storm.sql.runtime.trident.functions;
+import java.util.Map;
+
import org.apache.calcite.DataContext;
import org.apache.calcite.interpreter.Context;
import org.apache.calcite.interpreter.StormContext;
@@ -29,8 +32,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public class EvaluationFilter extends BaseFilter {
private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
@@ -38,6 +39,11 @@ public class EvaluationFilter extends BaseFilter {
private final DataContext dataContext;
private final Object[] outputValues;
+ /**
+ * EvaluationFilter Constructor.
+ * @param filterInstance ExecutableExpression
+ * @param dataContext DataContext
+ */
public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
this.filterInstance = filterInstance;
this.dataContext = dataContext;
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
index 728d75e..340492d 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -16,8 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.storm.sql.runtime.trident.functions;
+import java.util.Map;
+
import org.apache.calcite.DataContext;
import org.apache.calcite.interpreter.Context;
import org.apache.calcite.interpreter.StormContext;
@@ -30,8 +33,6 @@ import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public class EvaluationFunction implements OperationAwareMapFunction {
private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
@@ -39,6 +40,12 @@ public class EvaluationFunction implements OperationAwareMapFunction {
private final Object[] outputValues;
private final DataContext dataContext;
+ /**
+ * EvaluationFunction Constructor.
+ * @param projectionInstance ExecutableExpression
+ * @param outputCount output count
+ * @param dataContext DataContext
+ */
public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
this.projectionInstance = projectionInstance;
this.outputValues = new Object[outputCount];
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
index 4c3a266..cb12ab3 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.storm.sql.runtime.trident.functions;
import org.apache.storm.trident.operation.BaseFunction;
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
index efd5d25..26017dd 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
@@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.utils;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.FieldInfo;
import java.io.Serializable;
import java.util.List;
+import org.apache.storm.sql.runtime.FieldInfo;
+
public final class FieldInfoUtils {
public static List<String> getFieldNames(List<FieldInfo> fields) {
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
index 2dcd66c..85c3500 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
@@ -15,11 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.utils;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.storm.spout.Scheme;
@@ -34,13 +42,14 @@ import org.apache.storm.sql.runtime.serde.tsv.TsvScheme;
import org.apache.storm.sql.runtime.serde.tsv.TsvSerializer;
import org.apache.storm.utils.ReflectionUtils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
public final class SerdeUtils {
+ /**
+ * Get a Scheme instance based on specific configurations.
+ * @param inputFormatClass input format class
+ * @param properties Properties
+ * @param fieldNames field names
+ * @return the Scheme instance
+ */
public static Scheme getScheme(String inputFormatClass, Properties properties, List<String> fieldNames) {
Scheme scheme;
if (isNotEmpty(inputFormatClass)) {
@@ -65,6 +74,13 @@ public final class SerdeUtils {
return scheme;
}
+ /**
+ * Get a OutputSerializer instance based on specific configurations.
+ * @param outputFormatClass output format class
+ * @param properties Properties
+ * @param fieldNames field names
+ * @return the OutputSerializer instance
+ */
public static IOutputSerializer getSerializer(String outputFormatClass, Properties properties, List<String> fieldNames) {
IOutputSerializer serializer;
if (isNotEmpty(outputFormatClass)) {
@@ -89,7 +105,12 @@ public final class SerdeUtils {
return serializer;
}
- public static Object convertAvroUtf8(Object value){
+ /**
+ * Convert a Avro object to a Java object, changing the Avro Utf8 type to Java String.
+ * @param value Avro object
+ * @return Java object
+ */
+ public static Object convertAvroUtf8(Object value) {
Object ret;
if (value instanceof Utf8) {
ret = value.toString();
@@ -103,7 +124,7 @@ public final class SerdeUtils {
return ret;
}
- public static Object convertAvroUtf8Map(Map<Object,Object> value) {
+ private static Object convertAvroUtf8Map(Map<Object,Object> value) {
Map<Object, Object> map = new HashMap<>(value.size());
for (Map.Entry<Object, Object> entry : value.entrySet()) {
Object k = convertAvroUtf8(entry.getKey());
@@ -113,9 +134,9 @@ public final class SerdeUtils {
return map;
}
- public static Object convertAvroUtf8Array(GenericData.Array value){
+ private static Object convertAvroUtf8Array(GenericData.Array value) {
List<Object> ls = new ArrayList<>(value.size());
- for(Object o : value){
+ for (Object o : value) {
ls.add(convertAvroUtf8(o));
}
return ls;
http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
index a0f3af3..59de0c0 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.runtime.utils;
import java.util.LinkedList;
@@ -23,27 +24,27 @@ import java.util.List;
public final class Utils {
/**
- * This method for splitting string into parts by a delimiter
- * It has higher performance than String.split(String regex)
+ * This method for splitting string into parts by a delimiter.
+ * It has higher performance than String.split(String regex).
*
- * @param data
- * @param delimiter
- * @return
+ * @param data need split string
+ * @param delimiter the delimiter
+ * @return string list
*/
- public static List<String> split(String data, char delimiter){
+ public static List<String> split(String data, char delimiter) {
List<String> list = new LinkedList<>();
//do not use .toCharArray avoid system copy
StringBuilder sb = new StringBuilder(512);
int len = data.length();
- for (int i=0; i < len; i++) {
+ for (int i = 0; i < len; i++) {
char ch = data.charAt(i);
- if(ch == delimiter){
+ if (ch == delimiter) {
list.add(sb.toString());
sb.setLength(0);
- if(i == len - 1){
+ if (i == len - 1) {
list.add("");
}
- }else{
+ } else {
sb.append(ch);
}
}
[2/2] storm git commit: Merge branch 'STORM-2634' of
https://github.com/vesense/storm
Posted by xi...@apache.org.
Merge branch 'STORM-2634' of https://github.com/vesense/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/72e219c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/72e219c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/72e219c1
Branch: refs/heads/master
Commit: 72e219c1551aa999a51e6f6ce91f3dbaa279494c
Parents: 2fc414d 2a36ec7
Author: Xin Wang <be...@163.com>
Authored: Sun Jul 16 19:13:05 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Jul 16 19:13:05 2017 +0800
----------------------------------------------------------------------
sql/storm-sql-runtime/pom.xml | 3 -
.../calcite/interpreter/StormContext.java | 5 +-
.../sql/runtime/AbstractChannelHandler.java | 43 +++---
.../sql/runtime/AbstractValuesProcessor.java | 23 +--
.../storm/sql/runtime/ChannelContext.java | 18 ++-
.../storm/sql/runtime/ChannelHandler.java | 21 +--
.../org/apache/storm/sql/runtime/Channels.java | 149 ++++++++++---------
.../apache/storm/sql/runtime/DataSource.java | 3 +-
.../storm/sql/runtime/DataSourcesProvider.java | 42 +++---
.../storm/sql/runtime/DataSourcesRegistry.java | 96 +++++++-----
.../org/apache/storm/sql/runtime/FieldInfo.java | 43 +++---
.../storm/sql/runtime/IOutputSerializer.java | 15 +-
.../sql/runtime/ISqlTridentDataSource.java | 72 ++++-----
.../sql/runtime/SimpleSqlTridentConsumer.java | 1 +
.../storm/sql/runtime/StormSqlFunctions.java | 34 +++--
.../runtime/calcite/ExecutableExpression.java | 5 +-
.../sql/runtime/calcite/StormDataContext.java | 13 +-
.../socket/SocketDataSourcesProvider.java | 13 +-
.../datasource/socket/trident/SocketState.java | 12 +-
.../socket/trident/SocketStateUpdater.java | 6 +-
.../socket/trident/TridentSocketSpout.java | 33 ++--
.../sql/runtime/serde/avro/AvroScheme.java | 74 ++++-----
.../sql/runtime/serde/avro/AvroSerializer.java | 75 +++++-----
.../sql/runtime/serde/avro/CachedSchemas.java | 12 +-
.../storm/sql/runtime/serde/csv/CsvScheme.java | 60 ++++----
.../sql/runtime/serde/csv/CsvSerializer.java | 39 ++---
.../sql/runtime/serde/json/JsonScheme.java | 56 +++----
.../sql/runtime/serde/json/JsonSerializer.java | 46 +++---
.../storm/sql/runtime/serde/tsv/TsvScheme.java | 54 +++----
.../sql/runtime/serde/tsv/TsvSerializer.java | 41 ++---
.../trident/functions/EvaluationCalc.java | 17 ++-
.../trident/functions/EvaluationFilter.java | 10 +-
.../trident/functions/EvaluationFunction.java | 11 +-
.../trident/functions/ForwardFunction.java | 1 +
.../storm/sql/runtime/utils/FieldInfoUtils.java | 4 +-
.../storm/sql/runtime/utils/SerdeUtils.java | 41 +++--
.../apache/storm/sql/runtime/utils/Utils.java | 21 +--
37 files changed, 677 insertions(+), 535 deletions(-)
----------------------------------------------------------------------