You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/07/16 11:13:18 UTC

[1/2] storm git commit: STORM-2634: Apply new code style to storm-sql-runtime

Repository: storm
Updated Branches:
  refs/heads/master 2fc414d00 -> 72e219c15


STORM-2634: Apply new code style to storm-sql-runtime


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2a36ec7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2a36ec7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2a36ec7b

Branch: refs/heads/master
Commit: 2a36ec7bbe88d61685886b9de291b7fcc027b899
Parents: d7c7818
Author: Xin Wang <be...@163.com>
Authored: Sat Jul 15 16:10:39 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sat Jul 15 16:10:39 2017 +0800

----------------------------------------------------------------------
 sql/storm-sql-runtime/pom.xml                   |   3 -
 .../calcite/interpreter/StormContext.java       |   5 +-
 .../sql/runtime/AbstractChannelHandler.java     |  43 +++---
 .../sql/runtime/AbstractValuesProcessor.java    |  23 +--
 .../storm/sql/runtime/ChannelContext.java       |  18 ++-
 .../storm/sql/runtime/ChannelHandler.java       |  21 +--
 .../org/apache/storm/sql/runtime/Channels.java  | 149 ++++++++++---------
 .../apache/storm/sql/runtime/DataSource.java    |   3 +-
 .../storm/sql/runtime/DataSourcesProvider.java  |  42 +++---
 .../storm/sql/runtime/DataSourcesRegistry.java  |  96 +++++++-----
 .../org/apache/storm/sql/runtime/FieldInfo.java |  43 +++---
 .../storm/sql/runtime/IOutputSerializer.java    |  15 +-
 .../sql/runtime/ISqlTridentDataSource.java      |  72 ++++-----
 .../sql/runtime/SimpleSqlTridentConsumer.java   |   1 +
 .../storm/sql/runtime/StormSqlFunctions.java    |  34 +++--
 .../runtime/calcite/ExecutableExpression.java   |   5 +-
 .../sql/runtime/calcite/StormDataContext.java   |  13 +-
 .../socket/SocketDataSourcesProvider.java       |  13 +-
 .../datasource/socket/trident/SocketState.java  |  12 +-
 .../socket/trident/SocketStateUpdater.java      |   6 +-
 .../socket/trident/TridentSocketSpout.java      |  33 ++--
 .../sql/runtime/serde/avro/AvroScheme.java      |  74 ++++-----
 .../sql/runtime/serde/avro/AvroSerializer.java  |  75 +++++-----
 .../sql/runtime/serde/avro/CachedSchemas.java   |  12 +-
 .../storm/sql/runtime/serde/csv/CsvScheme.java  |  60 ++++----
 .../sql/runtime/serde/csv/CsvSerializer.java    |  39 ++---
 .../sql/runtime/serde/json/JsonScheme.java      |  56 +++----
 .../sql/runtime/serde/json/JsonSerializer.java  |  46 +++---
 .../storm/sql/runtime/serde/tsv/TsvScheme.java  |  54 +++----
 .../sql/runtime/serde/tsv/TsvSerializer.java    |  41 ++---
 .../trident/functions/EvaluationCalc.java       |  17 ++-
 .../trident/functions/EvaluationFilter.java     |  10 +-
 .../trident/functions/EvaluationFunction.java   |  11 +-
 .../trident/functions/ForwardFunction.java      |   1 +
 .../storm/sql/runtime/utils/FieldInfoUtils.java |   4 +-
 .../storm/sql/runtime/utils/SerdeUtils.java     |  41 +++--
 .../apache/storm/sql/runtime/utils/Utils.java   |  21 +--
 37 files changed, 677 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/pom.xml b/sql/storm-sql-runtime/pom.xml
index 511c792..f9969b2 100644
--- a/sql/storm-sql-runtime/pom.xml
+++ b/sql/storm-sql-runtime/pom.xml
@@ -135,9 +135,6 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
-                <configuration>
-                    <maxAllowedViolations>485</maxAllowedViolations>
-                </configuration>
             </plugin>
         </plugins>
     </build>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
index aa7e435..700284c 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.calcite.interpreter;
 
-import org.apache.calcite.DataContext;
+package org.apache.calcite.interpreter;
 
 import java.io.Serializable;
 
+import org.apache.calcite.DataContext;
+
 /**
  * This is a hack to use Calcite Context.
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
index 64be39d..effdf55 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -15,38 +15,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import org.apache.storm.tuple.Values;
 
 public abstract class AbstractChannelHandler implements ChannelHandler {
-  @Override
-  public abstract void dataReceived(ChannelContext ctx, Values data);
-
-  @Override
-  public void channelInactive(ChannelContext ctx) {
-
-  }
+    @Override
+    public abstract void dataReceived(ChannelContext ctx, Values data);
 
-  @Override
-  public void exceptionCaught(Throwable cause) {
+    @Override
+    public void channelInactive(ChannelContext ctx) {
 
-  }
+    }
 
-  @Override
-  public void flush(ChannelContext ctx) {
-    ctx.flush();
-  }
+    @Override
+    public void exceptionCaught(Throwable cause) {
 
-  @Override
-  public void setSource(ChannelContext ctx, Object source) {
+    }
 
-  }
+    @Override
+    public void flush(ChannelContext ctx) {
+        ctx.flush();
+    }
 
-  public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
     @Override
-    public void dataReceived(ChannelContext ctx, Values data) {
-      ctx.emit(data);
+    public void setSource(ChannelContext ctx, Object source) {
+
     }
-  };
+
+    public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+        @Override
+        public void dataReceived(ChannelContext ctx, Values data) {
+            ctx.emit(data);
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
index 6a853be..1449325 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.runtime;
 
-import org.apache.storm.tuple.Values;
+package org.apache.storm.sql.runtime;
 
 import java.util.Map;
 
+import org.apache.storm.tuple.Values;
+
 /**
  * Subclass of AbstractTupleProcessor provides a series of tuple. It
  * takes a series of iterators of {@link Values} and produces a stream of
  * tuple.
- *
+ * <p/>
  * The subclass implements the {@see next()} method to provide
  * the output of the stream. It can choose to return null in {@see next()} to
  * indicate that this particular iteration is a no-op. SQL processors depend
@@ -33,12 +34,12 @@ import java.util.Map;
  */
 public abstract class AbstractValuesProcessor {
 
-  /**
-   * Initialize the data sources.
-   *
-   * @param data a map from the table name to the iterators of the values.
-   *
-   */
-  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
-      result);
+    /**
+     * Initialize the data sources.
+     *
+     * @param data a map from the table name to the iterators of the values.
+     *
+     */
+    public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+            result);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
index 65ad01c..9d6f662 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -15,16 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import org.apache.storm.tuple.Values;
 
 public interface ChannelContext {
-  /**
-   * Emit data to the next stage of the data pipeline.
-   */
-  void emit(Values data);
-  void fireChannelInactive();
-  void flush();
-  void setSource(Object source);
+    /**
+     * Emit data to the next stage of the data pipeline.
+     */
+    void emit(Values data);
+
+    void fireChannelInactive();
+
+    void flush();
+
+    void setSource(Object source);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
index af02b7e..3009df4 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import org.apache.storm.tuple.Values;
@@ -24,18 +25,18 @@ import org.apache.storm.tuple.Values;
  * series of events.
  */
 public interface ChannelHandler {
-  void dataReceived(ChannelContext ctx, Values data);
+    void dataReceived(ChannelContext ctx, Values data);
 
-  /**
-   * The producer of the data has indicated that the channel is no longer
-   * active.
-   * @param ctx
-   */
-  void channelInactive(ChannelContext ctx);
+    /**
+     * The producer of the data has indicated that the channel is no longer
+     * active.
+     * @param ctx ChannelContext
+     */
+    void channelInactive(ChannelContext ctx);
 
-  void exceptionCaught(Throwable cause);
+    void exceptionCaught(Throwable cause);
 
-  void flush(ChannelContext ctx);
+    void flush(ChannelContext ctx);
 
-  void setSource(ChannelContext ctx, Object source);
+    void setSource(ChannelContext ctx, Object source);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
index 3b5eedd..d389c38 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -15,95 +15,96 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import org.apache.storm.tuple.Values;
 
 public class Channels {
-  private static final ChannelContext VOID_CTX = new ChannelContext() {
-    @Override
-    public void emit(Values data) {}
-
-    @Override
-    public void fireChannelInactive() {}
-
-    @Override
-    public void flush() {
-
-    }
-
-    @Override
-    public void setSource(java.lang.Object source) {
-
-    }
-  };
-
-  private static class ChannelContextAdapter implements ChannelContext {
-    private final ChannelHandler handler;
-    private final ChannelContext next;
-
-    public ChannelContextAdapter(
-        ChannelContext next, ChannelHandler handler) {
-      this.handler = handler;
-      this.next = next;
+    private static final ChannelContext VOID_CTX = new ChannelContext() {
+        @Override
+        public void emit(Values data) {}
+
+        @Override
+        public void fireChannelInactive() {}
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void setSource(java.lang.Object source) {
+
+        }
+    };
+
+    private static class ChannelContextAdapter implements ChannelContext {
+        private final ChannelHandler handler;
+        private final ChannelContext next;
+
+        public ChannelContextAdapter(
+                ChannelContext next, ChannelHandler handler) {
+            this.handler = handler;
+            this.next = next;
+        }
+
+        @Override
+        public void emit(Values data) {
+            handler.dataReceived(next, data);
+        }
+
+        @Override
+        public void fireChannelInactive() {
+            handler.channelInactive(next);
+        }
+
+        @Override
+        public void flush() {
+            handler.flush(next);
+        }
+
+        @Override
+        public void setSource(java.lang.Object source) {
+            handler.setSource(next, source);
+            next.setSource(source); // propagate through the chain
+        }
     }
 
-    @Override
-    public void emit(Values data) {
-      handler.dataReceived(next, data);
-    }
-
-    @Override
-    public void fireChannelInactive() {
-      handler.channelInactive(next);
-    }
-
-    @Override
-    public void flush() {
-      handler.flush(next);
-    }
+    private static class ForwardingChannelContext implements ChannelContext {
+        private final ChannelContext next;
 
-    @Override
-    public void setSource(java.lang.Object source) {
-      handler.setSource(next, source);
-      next.setSource(source); // propagate through the chain
-    }
-  }
+        public ForwardingChannelContext(ChannelContext next) {
+            this.next = next;
+        }
 
-  private static class ForwardingChannelContext implements ChannelContext {
-    private final ChannelContext next;
+        @Override
+        public void emit(Values data) {
+            next.emit(data);
+        }
 
-    public ForwardingChannelContext(ChannelContext next) {
-      this.next = next;
-    }
+        @Override
+        public void fireChannelInactive() {
+            next.fireChannelInactive();
+        }
 
-    @Override
-    public void emit(Values data) {
-      next.emit(data);
-    }
+        @Override
+        public void flush() {
+            next.flush();
+        }
 
-    @Override
-    public void fireChannelInactive() {
-      next.fireChannelInactive();
+        @Override
+        public void setSource(Object source) {
+            next.setSource(source);
+        }
     }
 
-    @Override
-    public void flush() {
-      next.flush();
+    public static ChannelContext chain(
+            ChannelContext next, ChannelHandler handler) {
+        return new ChannelContextAdapter(next, handler);
     }
 
-    @Override
-    public void setSource(Object source) {
-      next.setSource(source);
+    public static ChannelContext voidContext() {
+        return VOID_CTX;
     }
-  }
-
-  public static ChannelContext chain(
-      ChannelContext next, ChannelHandler handler) {
-    return new ChannelContextAdapter(next, handler);
-  }
-
-  public static ChannelContext voidContext() {
-    return VOID_CTX;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
index 352af73..b90b4f5 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 /**
@@ -23,5 +24,5 @@ package org.apache.storm.sql.runtime;
  *
  */
 public interface DataSource {
-  void open(ChannelContext ctx);
+    void open(ChannelContext ctx);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
index dbece9c..baec6cd 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import java.net.URI;
@@ -22,26 +23,27 @@ import java.util.List;
 import java.util.Properties;
 
 public interface DataSourcesProvider {
-  /**
-   * @return the scheme of the data source
-   */
-  String scheme();
+    /**
+     * Get the scheme of the data source.
+     * @return the scheme of the data source
+     */
+    String scheme();
 
-  /**
-   * Construct a new data source.
-   * @param uri The URI that specifies the data source. The format of the URI
-   *            is fully customizable.
-   * @param inputFormatClass the name of the class that deserializes data.
-   *                         It is null when unspecified.
-   * @param outputFormatClass the name of the class that serializes data. It
-   *                          is null when unspecified.
-   * @param fields The name of the fields and the schema of the table.
-   */
-  DataSource construct(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      List<FieldInfo> fields);
+    /**
+     * Construct a new data source.
+     * @param uri The URI that specifies the data source. The format of the URI
+     *            is fully customizable.
+     * @param inputFormatClass the name of the class that deserializes data.
+     *                         It is null when unspecified.
+     * @param outputFormatClass the name of the class that serializes data. It
+     *                          is null when unspecified.
+     * @param fields The name of the fields and the schema of the table.
+     */
+    DataSource construct(
+            URI uri, String inputFormatClass, String outputFormatClass,
+            List<FieldInfo> fields);
 
-  ISqlTridentDataSource constructTrident(
-          URI uri, String inputFormatClass, String outputFormatClass,
-          Properties properties, List<FieldInfo> fields);
+    ISqlTridentDataSource constructTrident(
+            URI uri, String inputFormatClass, String outputFormatClass,
+            Properties properties, List<FieldInfo> fields);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index dfefb01..9d8368a 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -15,10 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.runtime;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.sql.runtime;
 
 import java.net.URI;
 import java.util.HashMap;
@@ -27,50 +25,70 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.ServiceLoader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class DataSourcesRegistry {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      DataSourcesRegistry.class);
-  private static final Map<String, DataSourcesProvider> providers;
+    private static final Logger LOG = LoggerFactory.getLogger(
+            DataSourcesRegistry.class);
+    private static final Map<String, DataSourcesProvider> providers;
 
-  static {
-    providers = new HashMap<>();
-    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
-        DataSourcesProvider.class);
-    for (DataSourcesProvider p : loader) {
-      LOG.info("Registering scheme {} with {}", p.scheme(), p);
-      providers.put(p.scheme(), p);
+    static {
+        providers = new HashMap<>();
+        ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+                DataSourcesProvider.class);
+        for (DataSourcesProvider p : loader) {
+            LOG.info("Registering scheme {} with {}", p.scheme(), p);
+            providers.put(p.scheme(), p);
+        }
     }
-  }
-
-  private DataSourcesRegistry() {
-  }
 
-  public static DataSource construct(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      List<FieldInfo> fields) {
-    DataSourcesProvider provider = providers.get(uri.getScheme());
-    if (provider == null) {
-      return null;
+    private DataSourcesRegistry() {
     }
 
-    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
-  }
+    /**
+     * Construct a data source.
+     * @param uri data source uri
+     * @param inputFormatClass input format class
+     * @param outputFormatClass output format class
+     * @param fields fields info list
+     * @return DataSource object
+     */
+    public static DataSource construct(
+            URI uri, String inputFormatClass, String outputFormatClass,
+            List<FieldInfo> fields) {
+        DataSourcesProvider provider = providers.get(uri.getScheme());
+        if (provider == null) {
+            return null;
+        }
 
-  public static ISqlTridentDataSource constructTridentDataSource(
-          URI uri, String inputFormatClass, String outputFormatClass,
-          Properties properties, List<FieldInfo> fields) {
-    DataSourcesProvider provider = providers.get(uri.getScheme());
-    if (provider == null) {
-      return null;
+        return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
     }
 
-    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
-  }
+    /**
+     * Construct a trident data source.
+     * @param uri data source uri
+     * @param inputFormatClass input format class
+     * @param outputFormatClass output format class
+     * @param properties Properties
+     * @param fields fields info list
+     * @return TridentDataSource object
+     */
+    public static ISqlTridentDataSource constructTridentDataSource(
+            URI uri, String inputFormatClass, String outputFormatClass,
+            Properties properties, List<FieldInfo> fields) {
+        DataSourcesProvider provider = providers.get(uri.getScheme());
+        if (provider == null) {
+            return null;
+        }
 
-  /**
-   * Allow unit tests to inject data sources.
-   */
-  public static Map<String, DataSourcesProvider> providerMap() {
-    return providers;
-  }
+        return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
+    }
+
+    /**
+     * Allow unit tests to inject data sources.
+     */
+    public static Map<String, DataSourcesProvider> providerMap() {
+        return providers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
index 03b030b..c547851 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
@@ -15,33 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import java.io.Serializable;
 
 /**
- * Describe each column of the field
+ * Describe each column of the field.
  */
 public class FieldInfo implements Serializable {
-  private final String name;
-  private final Class<?> type;
-  private final boolean isPrimary;
+    private final String name;
+    private final Class<?> type;
+    private final boolean isPrimary;
 
-  public FieldInfo(String name, Class<?> type, boolean isPrimary) {
-    this.name = name;
-    this.type = type;
-    this.isPrimary = isPrimary;
-  }
+    /**
+     * FieldInfo Constructor.
+     * @param name field name
+     * @param type filed type
+     * @param isPrimary primary or not
+     */
+    public FieldInfo(String name, Class<?> type, boolean isPrimary) {
+        this.name = name;
+        this.type = type;
+        this.isPrimary = isPrimary;
+    }
 
-  public String name() {
-    return name;
-  }
+    public String name() {
+        return name;
+    }
 
-  public Class<?> type() {
-    return type;
-  }
+    public Class<?> type() {
+        return type;
+    }
 
-  public boolean isPrimary() {
-    return isPrimary;
-  }
+    public boolean isPrimary() {
+        return isPrimary;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
index b6670d9..46d56b7 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 
 public interface IOutputSerializer {
-  /**
-   * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
-   * memory.
-   *
-   * @return A ByteBuffer contains the serialized result.
-   */
-  ByteBuffer write(List<Object> data, ByteBuffer buffer);
+    /**
+     * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+     * memory.
+     *
+     * @return A ByteBuffer contains the serialized result.
+     */
+    ByteBuffer write(List<Object> data, ByteBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
index 9eae5ae..81fb386 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import org.apache.storm.trident.spout.ITridentDataSource;
@@ -25,41 +26,42 @@ import org.apache.storm.trident.state.StateUpdater;
  * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
  */
 public interface ISqlTridentDataSource {
-  /**
-   * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
-   *
-   * Please note that StateFactory and StateUpdater should use same class which implements State.
-   *
-   * @see org.apache.storm.trident.state.StateFactory
-   * @see org.apache.storm.trident.state.StateUpdater
-   */
-  interface SqlTridentConsumer {
-    StateFactory getStateFactory();
-    StateUpdater getStateUpdater();
-  }
+    /**
+     * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
+     * <p/>
+     * Please note that StateFactory and StateUpdater should use same class which implements State.
+     *
+     * @see org.apache.storm.trident.state.StateFactory
+     * @see org.apache.storm.trident.state.StateUpdater
+     */
+    interface SqlTridentConsumer {
+        StateFactory getStateFactory();
+
+        StateUpdater getStateUpdater();
+    }
 
-  /**
-   * Provides instance of ITridentDataSource which can be used as producer in Trident.
-   *
-   * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
-   * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
-   * - IBatchSpout
-   * - ITridentSpout
-   * - IPartitionedTridentSpout
-   * - IOpaquePartitionedTridentSpout
-   *
-   * @see org.apache.storm.trident.spout.ITridentDataSource
-   * @see org.apache.storm.trident.spout.IBatchSpout
-   * @see org.apache.storm.trident.spout.ITridentSpout
-   * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
-   * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
-   */
-  ITridentDataSource getProducer();
+    /**
+     * Provides instance of ITridentDataSource which can be used as producer in Trident.
+     * <p/>
+     * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
+     * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
+     * - IBatchSpout
+     * - ITridentSpout
+     * - IPartitionedTridentSpout
+     * - IOpaquePartitionedTridentSpout
+     *
+     * @see org.apache.storm.trident.spout.ITridentDataSource
+     * @see org.apache.storm.trident.spout.IBatchSpout
+     * @see org.apache.storm.trident.spout.ITridentSpout
+     * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
+     * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
+     */
+    ITridentDataSource getProducer();
 
-  /**
-   * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
-   *
-   * @see SqlTridentConsumer
-   */
-  SqlTridentConsumer getConsumer();
+    /**
+     * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
+     *
+     * @see SqlTridentConsumer
+     */
+    SqlTridentConsumer getConsumer();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
index c9abd16..f9a6039 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 import org.apache.storm.trident.state.StateFactory;

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
index a373483..095e6ba 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -15,20 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime;
 
 public class StormSqlFunctions {
-  public static Boolean eq(Object b0, Object b1) {
-    if (b0 == null || b1 == null) {
-      return null;
+
+    /**
+     * Whether the object equals the other one.
+     * @param b0 one object
+     * @param b1 the other object
+     * @return true if the object equals the other one
+     */
+    public static Boolean eq(Object b0, Object b1) {
+        if (b0 == null || b1 == null) {
+            return null;
+        }
+        return b0.equals(b1);
     }
-    return b0.equals(b1);
-  }
 
-  public static Boolean ne(Object b0, Object b1) {
-    if (b0 == null || b1 == null) {
-      return null;
+    /**
+     * Whether the object dose not equals the other one.
+     * @param b0 one object
+     * @param b1 the other object
+     * @return true if the object dose not equals the other one
+     */
+    public static Boolean ne(Object b0, Object b1) {
+        if (b0 == null || b1 == null) {
+            return null;
+        }
+        return !b0.equals(b1);
     }
-    return !b0.equals(b1);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
index 8416945..699a056 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/ExecutableExpression.java
@@ -18,14 +18,15 @@
 
 package org.apache.storm.sql.runtime.calcite;
 
-import org.apache.calcite.interpreter.Context;
-
 import java.io.Serializable;
 
+import org.apache.calcite.interpreter.Context;
+
 /**
  * Compiled executable expression.
  */
 public interface ExecutableExpression extends Serializable {
     Object execute(Context context);
+
     void execute(Context context, Object[] results);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
index 4861b43..6f506cc 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/calcite/StormDataContext.java
@@ -15,9 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.calcite;
 
 import com.google.common.collect.ImmutableMap;
+
+import java.io.Serializable;
+import java.util.Calendar;
+import java.util.TimeZone;
+
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.QueryProvider;
@@ -25,16 +31,15 @@ import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.util.Holder;
 
-import java.io.Serializable;
-import java.util.Calendar;
-import java.util.TimeZone;
-
 /**
  * This is based on SlimDataContext in Calcite, and borrow some from DataContextImpl in Calcite.
  */
 public class StormDataContext implements DataContext, Serializable {
     private final ImmutableMap<Object, Object> map;
 
+    /**
+     * StormDataContext Constructor.
+     */
     public StormDataContext() {
         // Store the time at which the query started executing. The SQL
         // standard says that functions such as CURRENT_TIMESTAMP return the

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
index 0e65220..fe4d024 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -18,6 +18,10 @@
 
 package org.apache.storm.sql.runtime.datasource.socket;
 
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.storm.spout.Scheme;
 import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
@@ -34,14 +38,10 @@ import org.apache.storm.trident.spout.ITridentDataSource;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.state.StateUpdater;
 
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
 /**
  * Create a Socket data source based on the URI and properties. The URI has the format of
  * socket://[host]:[port]. Both of host and port are mandatory.
- *
+ * <p/>
  * Note that it connects to given host and port, and receive the message if it's used for input source,
  * and send the message if it's used for output data source.
  */
@@ -84,7 +84,8 @@ public class SocketDataSourcesProvider implements DataSourcesProvider {
     }
 
     @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) {
+    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                  Properties properties, List<FieldInfo> fields) {
         String host = uri.getHost();
         int port = uri.getPort();
         if (port == -1) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
index c12f8bd..60c1a5e 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
@@ -18,16 +18,16 @@
 
 package org.apache.storm.sql.runtime.datasource.socket.trident;
 
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.net.Socket;
 import java.util.Map;
 
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
 /**
  * Trident State implementation of Socket. It only supports writing.
  */
@@ -64,8 +64,8 @@ public class SocketState implements State {
                 Socket socket = new Socket(host, port);
                 out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
             } catch (IOException e) {
-                throw new RuntimeException("Exception while initializing socket for State. host " +
-                        host + " port " + port, e);
+                throw new RuntimeException("Exception while initializing socket for State. host "
+                        + host + " port " + port, e);
             }
 
             // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
index 3062a90..df2893b 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
@@ -18,6 +18,9 @@
 
 package org.apache.storm.sql.runtime.datasource.socket.trident;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.storm.sql.runtime.IOutputSerializer;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseStateUpdater;
@@ -25,9 +28,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-
 /**
  * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
index 1bd9251..aad42fb 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
@@ -19,14 +19,6 @@
 package org.apache.storm.sql.runtime.datasource.socket.trident;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.storm.Config;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.Closeable;
@@ -41,6 +33,15 @@ import java.util.Map;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import org.apache.storm.Config;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
  */
@@ -51,7 +52,7 @@ public class TridentSocketSpout implements IBatchSpout {
     private final int port;
     private final Scheme scheme;
 
-    private volatile boolean _running = true;
+    private volatile boolean running = true;
 
     private BlockingDeque<String> queue;
     private Socket socket;
@@ -61,6 +62,12 @@ public class TridentSocketSpout implements IBatchSpout {
 
     private Map<Long, List<List<Object>>> batches;
 
+    /**
+     * TridentSocketSpout Constructor.
+     * @param scheme Scheme
+     * @param host socket host
+     * @param port socket port
+     */
     public TridentSocketSpout(Scheme scheme, String host, int port) {
         this.scheme = scheme;
         this.host = host;
@@ -91,7 +98,7 @@ public class TridentSocketSpout implements IBatchSpout {
         if (batch == null) {
             batch = new ArrayList<>();
 
-            while(queue.peek() != null) {
+            while (queue.peek() != null) {
                 String line = queue.poll();
                 List<Object> values = convertLineToTuple(line);
                 if (values == null) {
@@ -120,7 +127,7 @@ public class TridentSocketSpout implements IBatchSpout {
 
     @Override
     public void close() {
-        _running = false;
+        running = false;
         readerThread.interrupt();
         queue.clear();
 
@@ -142,7 +149,7 @@ public class TridentSocketSpout implements IBatchSpout {
 
     private class SocketReaderRunnable implements Runnable {
         public void run() {
-            while (_running) {
+            while (running) {
                 try {
                     String line = in.readLine();
                     if (line == null) {
@@ -160,7 +167,7 @@ public class TridentSocketSpout implements IBatchSpout {
 
     private void die(Throwable t) {
         LOG.error("Halting process: TridentSocketSpout died.", t);
-        if (_running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
+        if (running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
             System.exit(11);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
index 3bf1a23..147afe6 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
@@ -15,8 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.serde.avro;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -28,47 +34,47 @@ import org.apache.storm.sql.runtime.utils.SerdeUtils;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * AvroScheme uses generic(without code generation) instead of specific(with code generation) readers.
  */
 public class AvroScheme implements Scheme {
-  private final String schemaString;
-  private final List<String> fieldNames;
-  private final CachedSchemas schemas;
+    private final String schemaString;
+    private final List<String> fieldNames;
+    private final CachedSchemas schemas;
 
-  public AvroScheme(String schemaString, List<String> fieldNames) {
-    this.schemaString = schemaString;
-    this.fieldNames = fieldNames;
-    this.schemas = new CachedSchemas();
-  }
+    /**
+     * AvroScheme Constructor.
+     * @param schemaString schema string
+     * @param fieldNames field names
+     */
+    public AvroScheme(String schemaString, List<String> fieldNames) {
+        this.schemaString = schemaString;
+        this.fieldNames = fieldNames;
+        this.schemas = new CachedSchemas();
+    }
 
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    try {
-      Schema schema = schemas.getSchema(schemaString);
-      DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
-      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
-      GenericRecord record = reader.read(null, decoder);
+    @Override
+    public List<Object> deserialize(ByteBuffer ser) {
+        try {
+            Schema schema = schemas.getSchema(schemaString);
+            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
+            GenericRecord record = reader.read(null, decoder);
 
-      ArrayList<Object> list = new ArrayList<>(fieldNames.size());
-      for (String field : fieldNames) {
-        Object value = record.get(field);
-        // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
-        list.add(SerdeUtils.convertAvroUtf8(value));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+            ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+            for (String field : fieldNames) {
+                Object value = record.get(field);
+                // Avro strings are stored using a special Avro Utf8 type instead of using Java primitives
+                list.add(SerdeUtils.convertAvroUtf8(value));
+            }
+            return list;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
-  }
 
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fieldNames);
-  }
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(fieldNames);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
index 5dc3393..6e4204e 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroSerializer.java
@@ -15,9 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.serde.avro;
 
 import com.google.common.base.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -27,46 +35,45 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.storm.sql.runtime.IOutputSerializer;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
 /**
  * AvroSerializer uses generic(without code generation) instead of specific(with code generation) writers.
  */
 public class AvroSerializer implements IOutputSerializer, Serializable {
-  private final String schemaString;
-  private final List<String> fieldNames;
-  private final CachedSchemas schemas;
+    private final String schemaString;
+    private final List<String> fieldNames;
+    private final CachedSchemas schemas;
 
-  public AvroSerializer(String schemaString, List<String> fieldNames) {
-    this.schemaString = schemaString;
-    this.fieldNames = fieldNames;
-    this.schemas = new CachedSchemas();
-  }
+    /**
+     * AvroSerializer Constructor.
+     * @param schemaString schema string
+     * @param fieldNames field names
+     */
+    public AvroSerializer(String schemaString, List<String> fieldNames) {
+        this.schemaString = schemaString;
+        this.fieldNames = fieldNames;
+        this.schemas = new CachedSchemas();
+    }
 
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
-    try {
-      Schema schema = schemas.getSchema(schemaString);
-      GenericRecord record = new GenericData.Record(schema);
-      for (int i = 0; i < fieldNames.size(); i++) {
-        record.put(fieldNames.get(i), data.get(i));
-      }
+    @Override
+    public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+        Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schemas");
+        try {
+            Schema schema = schemas.getSchema(schemaString);
+            GenericRecord record = new GenericData.Record(schema);
+            for (int i = 0; i < fieldNames.size(); i++) {
+                record.put(fieldNames.get(i), data.get(i));
+            }
 
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
-      Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
-      writer.write(record, encoder);
-      encoder.flush();
-      byte[] bytes = out.toByteArray();
-      out.close();
-      return ByteBuffer.wrap(bytes);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
+            Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+            writer.write(record, encoder);
+            encoder.flush();
+            byte[] bytes = out.toByteArray();
+            out.close();
+            return ByteBuffer.wrap(bytes);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
index fae917b..28f6e86 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/CachedSchemas.java
@@ -15,20 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.runtime.serde.avro;
 
-import org.apache.avro.Schema;
+package org.apache.storm.sql.runtime.serde.avro;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.avro.Schema;
+
 // TODO this class is reserved for supporting messages with different schemas.
 // current only one schema in the cache
-public class CachedSchemas implements Serializable{
+public class CachedSchemas implements Serializable {
 
     private final Map<String, Schema> cache = new HashMap<>();
 
+    /**
+     * Get a schema based on schema string.
+     * @param schemaString schema string
+     * @return Schema object
+     */
     public Schema getSchema(String schemaString) {
         Schema schema = cache.get(schemaString);
         if (schema == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
index 34fb1bb..1bf3d4d 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvScheme.java
@@ -15,15 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.serde.csv;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -31,6 +26,13 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
 /**
  * CsvScheme uses the standard RFC4180 CSV Parser
  * One of the difference from Tsv format is that fields with embedded commas will be quoted.
@@ -39,32 +41,32 @@ import java.util.List;
  * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
  */
 public class CsvScheme implements Scheme {
-  private final List<String> fieldNames;
+    private final List<String> fieldNames;
 
-  public CsvScheme(List<String> fieldNames) {
-    this.fieldNames = fieldNames;
-  }
+    public CsvScheme(List<String> fieldNames) {
+        this.fieldNames = fieldNames;
+    }
 
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    try {
-      String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
-      CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
-      CSVRecord record = parser.getRecords().get(0);
-      Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
+    @Override
+    public List<Object> deserialize(ByteBuffer ser) {
+        try {
+            String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+            CSVParser parser = CSVParser.parse(data, CSVFormat.RFC4180);
+            CSVRecord record = parser.getRecords().get(0);
+            Preconditions.checkArgument(record.size() == fieldNames.size(), "Invalid schema");
 
-      ArrayList<Object> list = new ArrayList<>(fieldNames.size());
-      for (int i = 0; i < record.size(); i++) {
-        list.add(record.get(i));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+            ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+            for (int i = 0; i < record.size(); i++) {
+                list.add(record.get(i));
+            }
+            return list;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
-  }
 
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fieldNames);
-  }
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(fieldNames);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
index 0d3bd74..cbcdb3c 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/csv/CsvSerializer.java
@@ -15,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.runtime.serde.csv;
 
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.storm.sql.runtime.IOutputSerializer;
+package org.apache.storm.sql.runtime.serde.csv;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -28,6 +25,10 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
 /**
  * CsvSerializer uses the standard RFC4180 CSV Parser
  * One of the difference from Tsv format is that fields with embedded commas will be quoted.
@@ -36,24 +37,24 @@ import java.util.List;
  * @see <a href="https://tools.ietf.org/html/rfc4180">RFC4180</a>
  */
 public class CsvSerializer implements IOutputSerializer, Serializable {
-  private final List<String> fields; //reserved for future
+    private final List<String> fields; //reserved for future
 
-  public CsvSerializer(List<String> fields) {
+    public CsvSerializer(List<String> fields) {
         this.fields = fields;
     }
 
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    try {
-      StringWriter writer = new StringWriter();
-      CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
-      for (Object o : data) {
-        printer.print(o);
-      }
-      //since using StringWriter, we do not need to close it.
-      return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    @Override
+    public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+        try {
+            StringWriter writer = new StringWriter();
+            CSVPrinter printer = new CSVPrinter(writer, CSVFormat.RFC4180);
+            for (Object o : data) {
+                printer.print(o);
+            }
+            //since using StringWriter, we do not need to close it.
+            return ByteBuffer.wrap(writer.getBuffer().toString().getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
index d288fa1..db202fc 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java
@@ -15,11 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.serde.json;
 
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
@@ -28,31 +26,35 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
 public class JsonScheme implements Scheme {
-  private final List<String> fields;
-
-  public JsonScheme(List<String> fields) {
-    this.fields = fields;
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      @SuppressWarnings("unchecked")
-      HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
-      ArrayList<Object> list = new ArrayList<>(fields.size());
-      for (String f : fields) {
-        list.add(map.get(f));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    private final List<String> fields;
+
+    public JsonScheme(List<String> fields) {
+        this.fields = fields;
     }
-  }
 
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fields);
-  }
+    @Override
+    public List<Object> deserialize(ByteBuffer ser) {
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            @SuppressWarnings("unchecked")
+            HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+            ArrayList<Object> list = new ArrayList<>(fields.size());
+            for (String f : fields) {
+                list.add(map.get(f));
+            }
+            return list;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(fields);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
index 1e825c4..0f40fdf 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.serde.json;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.IOutputSerializer;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -29,29 +29,31 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
 public class JsonSerializer implements IOutputSerializer, Serializable {
-  private final List<String> fieldNames;
-  private final JsonFactory jsonFactory;
+    private final List<String> fieldNames;
+    private final JsonFactory jsonFactory;
 
-  public JsonSerializer(List<String> fieldNames) {
-    this.fieldNames = fieldNames;
-    jsonFactory = new JsonFactory();
-  }
+    public JsonSerializer(List<String> fieldNames) {
+        this.fieldNames = fieldNames;
+        jsonFactory = new JsonFactory();
+    }
 
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
-    StringWriter sw = new StringWriter();
-    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
-      jg.writeStartObject();
-      for (int i = 0; i < fieldNames.size(); ++i) {
-        jg.writeFieldName(fieldNames.get(i));
-        jg.writeObject(data.get(i));
-      }
-      jg.writeEndObject();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    @Override
+    public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+        Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+        StringWriter sw = new StringWriter();
+        try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+            jg.writeStartObject();
+            for (int i = 0; i < fieldNames.size(); ++i) {
+                jg.writeFieldName(fieldNames.get(i));
+                jg.writeObject(data.get(i));
+            }
+            jg.writeEndObject();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
     }
-    return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
index 310494c..7d9a896 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvScheme.java
@@ -15,44 +15,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.serde.tsv;
 
 import com.google.common.base.Preconditions;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
 /**
  * TsvScheme uses a simple delimited format implemention by splitting string,
  * and it supports user defined delimiter.
  */
 public class TsvScheme implements Scheme {
-  private final List<String> fieldNames;
-  private final char delimiter;
-
-  public TsvScheme(List<String> fieldNames, char delimiter) {
-    this.fieldNames = fieldNames;
-    this.delimiter = delimiter;
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
-    List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
-    Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
-
-    ArrayList<Object> list = new ArrayList<>(fieldNames.size());
-    list.addAll(parts);
-    return list;
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fieldNames);
-  }
+    private final List<String> fieldNames;
+    private final char delimiter;
+
+    public TsvScheme(List<String> fieldNames, char delimiter) {
+        this.fieldNames = fieldNames;
+        this.delimiter = delimiter;
+    }
+
+    @Override
+    public List<Object> deserialize(ByteBuffer ser) {
+        String data = new String(Utils.toByteArray(ser), StandardCharsets.UTF_8);
+        List<String> parts = org.apache.storm.sql.runtime.utils.Utils.split(data, delimiter);
+        Preconditions.checkArgument(parts.size() == fieldNames.size(), "Invalid schema");
+
+        ArrayList<Object> list = new ArrayList<>(fieldNames.size());
+        list.addAll(parts);
+        return list;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(fieldNames);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
index 1cf1c76..d5b41ac 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/tsv/TsvSerializer.java
@@ -15,40 +15,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.runtime.serde.tsv;
 
-import org.apache.storm.sql.runtime.IOutputSerializer;
+package org.apache.storm.sql.runtime.serde.tsv;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
 /**
  * TsvSerializer uses a simple delimited format implemention by splitting string,
  * and it supports user defined delimiter.
  */
 public class TsvSerializer implements IOutputSerializer, Serializable {
-  private final List<String> fields; //reserved for future
-  private final char delimiter;
+    private final List<String> fields; //reserved for future
+    private final char delimiter;
 
-  public TsvSerializer(List<String> fields, char delimiter) {
-    this.fields = fields;
-    this.delimiter = delimiter;
+    public TsvSerializer(List<String> fields, char delimiter) {
+        this.fields = fields;
+        this.delimiter = delimiter;
     }
 
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
-    for (int i = 0; i < data.size(); i++) {
-      Object o = data.get(i);
-      if (i == 0) {
-        sb.append(o);
-      } else {
-        sb.append(delimiter);
-        sb.append(o);
-      }
+    @Override
+    public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+        StringBuilder sb = new StringBuilder(512); // 512: for most scenes to avoid inner array resizing
+        for (int i = 0; i < data.size(); i++) {
+            Object o = data.get(i);
+            if (i == 0) {
+                sb.append(o);
+            } else {
+                sb.append(delimiter);
+                sb.append(o);
+            }
+        }
+        return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
     }
-    return ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
index 0318455..1b2f250 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
@@ -16,8 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.storm.sql.runtime.trident.functions;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.calcite.DataContext;
 import org.apache.calcite.interpreter.Context;
 import org.apache.calcite.interpreter.StormContext;
@@ -30,9 +34,6 @@ import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.Map;
-
 public class EvaluationCalc implements OperationAwareFlatMapFunction {
     private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
 
@@ -41,7 +42,15 @@ public class EvaluationCalc implements OperationAwareFlatMapFunction {
     private final Object[] outputValues;
     private final DataContext dataContext;
 
-    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+    /**
+     * EvaluationCalc Constructor.
+     * @param filterInstance ExecutableExpression
+     * @param projectionInstance ExecutableExpression
+     * @param outputCount output count
+     * @param dataContext DataContext
+     */
+    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance,
+                          int outputCount, DataContext dataContext) {
         this.filterInstance = filterInstance;
         this.projectionInstance = projectionInstance;
         this.outputValues = new Object[outputCount];

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
index 81e81aa..83e187c 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -16,8 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.storm.sql.runtime.trident.functions;
 
+import java.util.Map;
+
 import org.apache.calcite.DataContext;
 import org.apache.calcite.interpreter.Context;
 import org.apache.calcite.interpreter.StormContext;
@@ -29,8 +32,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 public class EvaluationFilter extends BaseFilter {
     private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
 
@@ -38,6 +39,11 @@ public class EvaluationFilter extends BaseFilter {
     private final DataContext dataContext;
     private final Object[] outputValues;
 
+    /**
+     * EvaluationFilter Constructor.
+     * @param filterInstance ExecutableExpression
+     * @param dataContext DataContext
+     */
     public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
         this.filterInstance = filterInstance;
         this.dataContext = dataContext;

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
index 728d75e..340492d 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -16,8 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.storm.sql.runtime.trident.functions;
 
+import java.util.Map;
+
 import org.apache.calcite.DataContext;
 import org.apache.calcite.interpreter.Context;
 import org.apache.calcite.interpreter.StormContext;
@@ -30,8 +33,6 @@ import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 public class EvaluationFunction implements OperationAwareMapFunction {
     private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
 
@@ -39,6 +40,12 @@ public class EvaluationFunction implements OperationAwareMapFunction {
     private final Object[] outputValues;
     private final DataContext dataContext;
 
+    /**
+     * EvaluationFunction Constructor.
+     * @param projectionInstance ExecutableExpression
+     * @param outputCount output count
+     * @param dataContext DataContext
+     */
     public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
         this.projectionInstance = projectionInstance;
         this.outputValues = new Object[outputCount];

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
index 4c3a266..cb12ab3 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.storm.sql.runtime.trident.functions;
 
 import org.apache.storm.trident.operation.BaseFunction;

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
index efd5d25..26017dd 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.utils;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-import org.apache.storm.sql.runtime.FieldInfo;
 
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.storm.sql.runtime.FieldInfo;
+
 public final class FieldInfoUtils {
 
     public static List<String> getFieldNames(List<FieldInfo> fields) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
index 2dcd66c..85c3500 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
@@ -15,11 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.utils;
 
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
 import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
 import org.apache.storm.spout.Scheme;
@@ -34,13 +42,14 @@ import org.apache.storm.sql.runtime.serde.tsv.TsvScheme;
 import org.apache.storm.sql.runtime.serde.tsv.TsvSerializer;
 import org.apache.storm.utils.ReflectionUtils;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 public final class SerdeUtils {
+    /**
+     * Get a Scheme instance based on specific configurations.
+     * @param inputFormatClass input format class
+     * @param properties Properties
+     * @param fieldNames field names
+     * @return the Scheme instance
+     */
     public static Scheme getScheme(String inputFormatClass, Properties properties, List<String> fieldNames) {
         Scheme scheme;
         if (isNotEmpty(inputFormatClass)) {
@@ -65,6 +74,13 @@ public final class SerdeUtils {
         return scheme;
     }
 
+    /**
+     * Get a OutputSerializer instance based on specific configurations.
+     * @param outputFormatClass output format class
+     * @param properties Properties
+     * @param fieldNames field names
+     * @return the OutputSerializer instance
+     */
     public static IOutputSerializer getSerializer(String outputFormatClass, Properties properties, List<String> fieldNames) {
         IOutputSerializer serializer;
         if (isNotEmpty(outputFormatClass)) {
@@ -89,7 +105,12 @@ public final class SerdeUtils {
         return serializer;
     }
 
-    public static Object convertAvroUtf8(Object value){
+    /**
+     * Convert a Avro object to a Java object, changing the Avro Utf8 type to Java String.
+     * @param value Avro object
+     * @return Java object
+     */
+    public static Object convertAvroUtf8(Object value) {
         Object ret;
         if (value instanceof Utf8) {
             ret = value.toString();
@@ -103,7 +124,7 @@ public final class SerdeUtils {
         return ret;
     }
 
-    public static Object convertAvroUtf8Map(Map<Object,Object> value) {
+    private static Object convertAvroUtf8Map(Map<Object,Object> value) {
         Map<Object, Object> map = new HashMap<>(value.size());
         for (Map.Entry<Object, Object> entry : value.entrySet()) {
             Object k = convertAvroUtf8(entry.getKey());
@@ -113,9 +134,9 @@ public final class SerdeUtils {
         return map;
     }
 
-    public static Object convertAvroUtf8Array(GenericData.Array value){
+    private static Object convertAvroUtf8Array(GenericData.Array value) {
         List<Object> ls = new ArrayList<>(value.size());
-        for(Object o : value){
+        for (Object o : value) {
             ls.add(convertAvroUtf8(o));
         }
         return ls;

http://git-wip-us.apache.org/repos/asf/storm/blob/2a36ec7b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
index a0f3af3..59de0c0 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/Utils.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.sql.runtime.utils;
 
 import java.util.LinkedList;
@@ -23,27 +24,27 @@ import java.util.List;
 public final class Utils {
 
     /**
-     * This method for splitting string into parts by a delimiter
-     * It has higher performance than String.split(String regex)
+     * This method for splitting string into parts by a delimiter.
+     * It has higher performance than String.split(String regex).
      *
-     * @param data
-     * @param delimiter
-     * @return
+     * @param data need split string
+     * @param delimiter the delimiter
+     * @return string list
      */
-    public static List<String> split(String data, char delimiter){
+    public static List<String> split(String data, char delimiter) {
         List<String> list = new LinkedList<>();
         //do not use .toCharArray avoid system copy
         StringBuilder sb = new StringBuilder(512);
         int len = data.length();
-        for (int i=0; i < len; i++) {
+        for (int i = 0; i < len; i++) {
             char ch = data.charAt(i);
-            if(ch == delimiter){
+            if (ch == delimiter) {
                 list.add(sb.toString());
                 sb.setLength(0);
-                if(i == len - 1){
+                if (i == len - 1) {
                     list.add("");
                 }
-            }else{
+            } else {
                 sb.append(ch);
             }
         }


[2/2] storm git commit: Merge branch 'STORM-2634' of https://github.com/vesense/storm

Posted by xi...@apache.org.
Merge branch 'STORM-2634' of https://github.com/vesense/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/72e219c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/72e219c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/72e219c1

Branch: refs/heads/master
Commit: 72e219c1551aa999a51e6f6ce91f3dbaa279494c
Parents: 2fc414d 2a36ec7
Author: Xin Wang <be...@163.com>
Authored: Sun Jul 16 19:13:05 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Jul 16 19:13:05 2017 +0800

----------------------------------------------------------------------
 sql/storm-sql-runtime/pom.xml                   |   3 -
 .../calcite/interpreter/StormContext.java       |   5 +-
 .../sql/runtime/AbstractChannelHandler.java     |  43 +++---
 .../sql/runtime/AbstractValuesProcessor.java    |  23 +--
 .../storm/sql/runtime/ChannelContext.java       |  18 ++-
 .../storm/sql/runtime/ChannelHandler.java       |  21 +--
 .../org/apache/storm/sql/runtime/Channels.java  | 149 ++++++++++---------
 .../apache/storm/sql/runtime/DataSource.java    |   3 +-
 .../storm/sql/runtime/DataSourcesProvider.java  |  42 +++---
 .../storm/sql/runtime/DataSourcesRegistry.java  |  96 +++++++-----
 .../org/apache/storm/sql/runtime/FieldInfo.java |  43 +++---
 .../storm/sql/runtime/IOutputSerializer.java    |  15 +-
 .../sql/runtime/ISqlTridentDataSource.java      |  72 ++++-----
 .../sql/runtime/SimpleSqlTridentConsumer.java   |   1 +
 .../storm/sql/runtime/StormSqlFunctions.java    |  34 +++--
 .../runtime/calcite/ExecutableExpression.java   |   5 +-
 .../sql/runtime/calcite/StormDataContext.java   |  13 +-
 .../socket/SocketDataSourcesProvider.java       |  13 +-
 .../datasource/socket/trident/SocketState.java  |  12 +-
 .../socket/trident/SocketStateUpdater.java      |   6 +-
 .../socket/trident/TridentSocketSpout.java      |  33 ++--
 .../sql/runtime/serde/avro/AvroScheme.java      |  74 ++++-----
 .../sql/runtime/serde/avro/AvroSerializer.java  |  75 +++++-----
 .../sql/runtime/serde/avro/CachedSchemas.java   |  12 +-
 .../storm/sql/runtime/serde/csv/CsvScheme.java  |  60 ++++----
 .../sql/runtime/serde/csv/CsvSerializer.java    |  39 ++---
 .../sql/runtime/serde/json/JsonScheme.java      |  56 +++----
 .../sql/runtime/serde/json/JsonSerializer.java  |  46 +++---
 .../storm/sql/runtime/serde/tsv/TsvScheme.java  |  54 +++----
 .../sql/runtime/serde/tsv/TsvSerializer.java    |  41 ++---
 .../trident/functions/EvaluationCalc.java       |  17 ++-
 .../trident/functions/EvaluationFilter.java     |  10 +-
 .../trident/functions/EvaluationFunction.java   |  11 +-
 .../trident/functions/ForwardFunction.java      |   1 +
 .../storm/sql/runtime/utils/FieldInfoUtils.java |   4 +-
 .../storm/sql/runtime/utils/SerdeUtils.java     |  41 +++--
 .../apache/storm/sql/runtime/utils/Utils.java   |  21 +--
 37 files changed, 677 insertions(+), 535 deletions(-)
----------------------------------------------------------------------