You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:22:54 UTC

[06/67] [abbrv] kylin git commit: Revert "reformat code"

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
index 0d8daa4..e8a93bd 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
@@ -88,23 +88,19 @@ public class CLIHiveClient implements IHiveClient {
         List<HiveTableMeta.HiveTableColumnMeta> allColumns = Lists.newArrayList();
         List<HiveTableMeta.HiveTableColumnMeta> partitionColumns = Lists.newArrayList();
         for (FieldSchema fieldSchema : allFields) {
-            allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(),
-                    fieldSchema.getComment()));
+            allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment()));
         }
         if (partitionFields != null && partitionFields.size() > 0) {
             for (FieldSchema fieldSchema : partitionFields) {
-                partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(),
-                        fieldSchema.getComment()));
+                partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment()));
             }
         }
         builder.setAllColumns(allColumns);
         builder.setPartitionColumns(partitionColumns);
 
         builder.setSdLocation(table.getSd().getLocation());
-        builder.setFileSize(
-                getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE));
-        builder.setFileNum(
-                getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES));
+        builder.setFileSize(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE));
+        builder.setFileNum(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES));
         builder.setIsNative(!MetaStoreUtils.isNonNativeTable(table));
         builder.setTableName(tableName);
         builder.setSdInputFormat(table.getSd().getInputFormat());

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index c907b44..15d4456 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -137,8 +137,7 @@ public class HiveMRInput implements IMRInput {
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
-                    .getConfig();
+            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
             JobEngineConfig conf = new JobEngineConfig(cubeConfig);
 
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
@@ -169,8 +168,7 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-        private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
-                String jobWorkingDir) {
+        private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir) {
             ShellExecutable step = new ShellExecutable();
             step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
 
@@ -197,25 +195,21 @@ public class HiveMRInput implements IMRInput {
                 if (lookUpTableDesc.isView()) {
                     StringBuilder createIntermediateTableHql = new StringBuilder();
                     createIntermediateTableHql.append("DROP TABLE IF EXISTS " + intermediate + ";\n");
-                    createIntermediateTableHql
-                            .append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n");
+                    createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n");
                     createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n");
-                    createIntermediateTableHql
-                            .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n");
+                    createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n");
                     hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
                     hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
                 }
             }
 
-            hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0,
-                    hiveViewIntermediateTables.length() - 1);
+            hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length() - 1);
 
             step.setCmd(hiveCmdBuilder.build());
             return step;
         }
 
-        private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
-                String cubeName) {
+        private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) {
             final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
             final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
             String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
@@ -302,12 +296,10 @@ public class HiveMRInput implements IMRInput {
                 logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
                 if (rowCount == 0) {
                     if (!config.isEmptySegmentAllowed()) {
-                        stepLogger.log("Detect upstream hive table is empty, "
-                                + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
+                        stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
                         return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
                     } else {
-                        return new ExecuteResult(ExecuteResult.State.SUCCEED,
-                                "Row count is 0, no need to redistribute");
+                        return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute");
                     }
                 }
 
@@ -384,8 +376,7 @@ public class HiveMRInput implements IMRInput {
                 config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
                 output.append("Hive table " + hiveTable + " is dropped. \n");
                 rmdirOnHDFS(getExternalDataPath());
-                output.append(
-                        "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+                output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
             }
             return output.toString();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
index 5fff000..14ed1f9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.source.hive;
 
 import java.io.IOException;
 
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.DFSFileTable;
@@ -77,7 +78,7 @@ public class HiveTable implements IReadableTable {
                 throw new IOException(e);
         }
     }
-
+    
     @Override
     public boolean exists() {
         return true;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
index 089850a..fa9eb29 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
@@ -34,8 +34,7 @@ class HiveTableMeta {
 
         @Override
         public String toString() {
-            return "HiveTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='"
-                    + comment + '\'' + '}';
+            return "HiveTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='" + comment + '\'' + '}';
         }
     }
 
@@ -53,9 +52,7 @@ class HiveTableMeta {
     List<HiveTableColumnMeta> allColumns;
     List<HiveTableColumnMeta> partitionColumns;
 
-    public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner,
-            String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount,
-            boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) {
+    public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner, String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) {
         this.tableName = tableName;
         this.sdLocation = sdLocation;
         this.sdInputFormat = sdInputFormat;
@@ -73,10 +70,6 @@ class HiveTableMeta {
 
     @Override
     public String toString() {
-        return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\''
-                + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\''
-                + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime
-                + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns="
-                + allColumns + ", partitionColumns=" + partitionColumns + '}';
+        return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns + ", partitionColumns=" + partitionColumns + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
index 6fedd8b..073ded5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
@@ -106,7 +106,6 @@ public class HiveTableMetaBuilder {
     }
 
     public HiveTableMeta createHiveTableMeta() {
-        return new HiveTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, tableType, lastAccessTime,
-                fileSize, fileNum, skipHeaderLineCount, isNative, allColumns, partitionColumns);
+        return new HiveTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, tableType, lastAccessTime, fileSize, fileNum, skipHeaderLineCount, isNative, allColumns, partitionColumns);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 48e0ee3..75f322f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -143,8 +143,7 @@ public class HiveTableReader implements TableReader {
         return "hive table reader for: " + dbName + "." + tableName;
     }
 
-    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV)
-            throws Exception {
+    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
         HiveConf hiveConf = new HiveConf(HiveTableReader.class);
         Iterator<Entry<String, String>> itr = hiveConf.iterator();
         Map<String, String> map = new HashMap<String, String>();
@@ -157,8 +156,7 @@ public class HiveTableReader implements TableReader {
         if (partitionKV == null || partitionKV.size() == 0) {
             entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
         } else {
-            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV)
-                    .build();
+            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
         }
 
         HCatReader reader = DataTransferFactory.getHCatReader(entity, map);

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
index 9db65042..22bea46 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
@@ -18,11 +18,11 @@
 
 package org.apache.kylin.source.hive;
 
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-
 public interface IHiveClient {
     void executeHQL(String hql) throws CommandNeedRetryException, IOException;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 52730bd..ffd54db 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -71,7 +71,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
         ColumnDesc[] columns = tableDesc.getColumns();
         Collection<String[]> valuesCollection = tableInputFormat.parseMapperInput(value);
 
-        for (String[] values : valuesCollection) {
+        for (String[] values: valuesCollection) {
             for (int m = 0; m < columns.length; m++) {
                 String field = columns[m].getName();
                 String fieldValue = values[m];

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 3724ef7..0648960 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -49,8 +49,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
     }
 
     @Override
-    public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context)
-            throws IOException, InterruptedException {
+    public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
         int skey = key.get();
         for (BytesWritable v : values) {
             ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 7179a66..f439ccb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -50,8 +50,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
     public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
 
     @SuppressWarnings("static-access")
-    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
-            .withDescription("The hive table name").create("table");
+    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
 
     public HiveColumnCardinalityJob() {
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
index 5f48523..246822c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
@@ -52,8 +52,7 @@ public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob {
     public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job";
 
     @SuppressWarnings("static-access")
-    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
-            .withDescription("The hive table name").create("table");
+    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
 
     private String table;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
index 21dac70..4bcd572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -18,10 +18,10 @@
 
 package org.apache.kylin.source.kafka;
 
-import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.Map;
+
 /**
  */
 public class DefaultTimeParser extends AbstractTimeParser {

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 650f57e..50295c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -73,8 +73,7 @@ public class KafkaConfigManager {
         }
 
         @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
-                throws IOException {
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
             if (event == Event.DROP)
                 removeKafkaConfigLocal(cacheKey);
             else
@@ -218,13 +217,11 @@ public class KafkaConfigManager {
 
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
-        logger.info("Reloading Kafka Metadata from folder "
-                + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
+        logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
 
         kafkaMap.clear();
 
-        List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT,
-                MetadataConstants.FILE_SURFIX);
+        List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
         for (String path : paths) {
             KafkaConfig kafkaConfig;
             try {
@@ -234,8 +231,7 @@ public class KafkaConfigManager {
                 continue;
             }
             if (path.equals(kafkaConfig.getResourcePath()) == false) {
-                logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at "
-                        + kafkaConfig.getResourcePath());
+                logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath());
                 continue;
             }
             if (kafkaMap.containsKey(kafkaConfig.getName())) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 5815d53..3323afb 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -78,14 +78,13 @@ public class KafkaMRInput implements IMRInput {
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
         KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
         KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
-        List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()),
-                new Function<ColumnDesc, TblColRef>() {
-                    @Nullable
-                    @Override
-                    public TblColRef apply(ColumnDesc input) {
-                        return input.getRef();
-                    }
-                });
+        List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
+            @Nullable
+            @Override
+            public TblColRef apply(ColumnDesc input) {
+                return input.getRef();
+            }
+        });
 
         return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
     }
@@ -100,13 +99,11 @@ public class KafkaMRInput implements IMRInput {
         private StreamingParser streamingParser;
         private final JobEngineConfig conf;
 
-        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig,
-                JobEngineConfig conf) {
+        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
             try {
-                streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(),
-                        kafkaConfig.getParserProperties(), columns);
+                streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
             } catch (ReflectiveOperationException e) {
                 throw new IllegalArgumentException(e);
             }
@@ -117,8 +114,7 @@ public class KafkaMRInput implements IMRInput {
             job.setInputFormatClass(SequenceFileInputFormat.class);
             String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
             IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
-            String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
-                    JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
             try {
                 FileInputFormat.addInputPath(job, new Path(inputPath));
             } catch (IOException e) {
@@ -130,10 +126,10 @@ public class KafkaMRInput implements IMRInput {
         public Collection<String[]> parseMapperInput(Object mapperInput) {
             Text text = (Text) mapperInput;
             ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength());
-            List<StreamingMessageRow> streamingMessageRowList = streamingParser.parse(buffer);
+            List<StreamingMessageRow>  streamingMessageRowList = streamingParser.parse(buffer);
             List<String[]> parsedDataCollection = new ArrayList<>();
 
-            for (StreamingMessageRow row : streamingMessageRowList) {
+            for (StreamingMessageRow row: streamingMessageRowList) {
                 parsedDataCollection.add(row.getData().toArray(new String[row.getData().size()]));
             }
 
@@ -162,19 +158,16 @@ public class KafkaMRInput implements IMRInput {
             MapReduceExecutable result = new MapReduceExecutable();
 
             IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
-            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
-                    JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
             result.setName("Save data from Kafka");
             result.setMapReduceJobClass(KafkaFlatTableJob.class);
             JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
             StringBuilder cmd = new StringBuilder();
             jobBuilderSupport.appendMapReduceParameters(cmd);
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
-                    seg.getRealization().getName());
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
-                    "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
 
             result.setMapReduceParams(cmd.toString());
             return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index fc0d50d..52d2e6f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -32,8 +32,8 @@ import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
-import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -71,25 +71,20 @@ public class KafkaSource implements ISource {
         if (result.getStartOffset() == 0) {
             final CubeSegment last = cube.getLastSegment();
             if (last != null) {
-                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: "
-                        + last.getSourcePartitionOffsetEnd());
+                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
                 // from last seg's end position
                 result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
-            } else if (cube.getDescriptor().getPartitionOffsetStart() != null
-                    && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
-                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: "
-                        + cube.getDescriptor().getPartitionOffsetStart());
+            } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
                 result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
                 // from the topic's earliest offset;
-                logger.debug(
-                        "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
+                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                 result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }
 
-        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getKafkaConfig(cube.getRootFactTable());
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
@@ -113,9 +108,7 @@ public class KafkaSource implements ISource {
             for (Integer partitionId : latestOffsets.keySet()) {
                 if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
                     if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
-                        throw new IllegalArgumentException("Partition " + partitionId + " end offset ("
-                                + latestOffsets.get(partitionId) + ") is smaller than start offset ( "
-                                + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+                        throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
                     }
                 } else {
                     throw new IllegalStateException("New partition added in between, retry.");
@@ -133,8 +126,7 @@ public class KafkaSource implements ISource {
         }
 
         if (totalStartOffset > totalEndOffset) {
-            throw new IllegalArgumentException(
-                    "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+            throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
         }
 
         if (totalStartOffset == totalEndOffset) {
@@ -159,8 +151,7 @@ public class KafkaSource implements ISource {
 
         if (startOffset > 0) {
             if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
-                throw new IllegalArgumentException(
-                        "When 'startOffset' is > 0, need provide each partition's start offset");
+                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
             }
 
             long totalOffset = 0;
@@ -169,15 +160,13 @@ public class KafkaSource implements ISource {
             }
 
             if (totalOffset != startOffset) {
-                throw new IllegalArgumentException(
-                        "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
             }
         }
 
         if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
             if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
-                throw new IllegalArgumentException(
-                        "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
             }
 
             long totalOffset = 0;
@@ -186,8 +175,7 @@ public class KafkaSource implements ISource {
             }
 
             if (totalOffset != endOffset) {
-                throw new IllegalArgumentException(
-                        "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 1459c2d..2e3c11c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -19,20 +19,20 @@
 package org.apache.kylin.source.kafka;
 
 import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessageRow;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params
  */
@@ -68,8 +68,7 @@ public abstract class StreamingParser {
 
     abstract public boolean filter(StreamingMessageRow streamingMessageRow);
 
-    public static StreamingParser getStreamingParser(String parserName, String parserProperties,
-            List<TblColRef> columns) throws ReflectiveOperationException {
+    public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
         if (!StringUtils.isEmpty(parserName)) {
             logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties);
             Class clazz = Class.forName(parserName);
@@ -77,8 +76,7 @@ public abstract class StreamingParser {
             Constructor constructor = clazz.getConstructor(List.class, Map.class);
             return (StreamingParser) constructor.newInstance(columns, properties);
         } else {
-            throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties "
-                    + parserProperties + ".");
+            throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index beed6f7..de167b4 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -21,14 +21,15 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.TreeMap;
+import java.util.Collections;
+import java.util.Arrays;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.ByteBufferBackedInputStream;
 import org.apache.kylin.common.util.StreamingMessageRow;
@@ -36,7 +37,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.type.MapType;
@@ -68,8 +68,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
     private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
     private final Map<String, String[]> nameMap = new HashMap<>();
 
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
-            SimpleType.construct(Object.class));
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
 
     private AbstractTimeParser streamTimeParser;
 
@@ -89,12 +88,10 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 Constructor constructor = clazz.getConstructor(Map.class);
                 streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
             } catch (Exception e) {
-                throw new IllegalStateException(
-                        "Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
+                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
             }
         } else {
-            throw new IllegalStateException(
-                    "Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
+            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
         }
         mapper = new ObjectMapper();
         mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@@ -119,8 +116,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 }
             }
 
-            StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t,
-                    Collections.<String, Object> emptyMap());
+            StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t, Collections.<String, Object>emptyMap());
             List<StreamingMessageRow> messageRowList = new ArrayList<StreamingMessageRow>();
             messageRowList.add(streamingMessageRow);
             return messageRowList;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
index 1e75763..fc3bba0 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
@@ -18,15 +18,15 @@
 
 package org.apache.kylin.source.kafka.config;
 
-import java.io.Serializable;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
+
 /**
  */
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class BrokerConfig implements Serializable {
+public class BrokerConfig implements Serializable{
 
     @JsonProperty("id")
     private int id;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 44be966..afe888f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -39,8 +39,7 @@ import kafka.cluster.Broker;
  */
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
 public class KafkaClusterConfig extends RootPersistentEntity {
-    public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(
-            KafkaClusterConfig.class);
+    public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(KafkaClusterConfig.class);
 
     @JsonProperty("brokers")
     private List<BrokerConfig> brokerConfigs;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index b073921..cc32ed9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -57,8 +57,7 @@ public class KafkaConsumerProperties {
                     KafkaConsumerProperties config = new KafkaConsumerProperties();
                     config.properties = config.loadKafkaConsumerProperties();
 
-                    logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : "
-                            + System.identityHashCode(config));
+                    logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
                     ENV_INSTANCE = config;
                 } catch (IllegalArgumentException e) {
                     throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e);
@@ -82,14 +81,8 @@ public class KafkaConsumerProperties {
             configNames = ConsumerConfig.configNames();
         } catch (Error e) {
             // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException which is an Error, not Exception
-            String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms,"
-                    + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type,"
-                    + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password,"
-                    + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms,"
-                    + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes,"
-                    + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
-                    + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset")
-                            .split(",");
+            String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
+                    + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(",");
             configNames.addAll(Arrays.asList(configNamesArray));
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b3a0f19..11466e5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -97,8 +97,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             String topic = kafkaConfig.getTopic();
 
             if (brokers == null || brokers.length() == 0 || topic == null) {
-                throw new IllegalArgumentException(
-                        "Invalid Kafka information, brokers " + brokers + ", topic " + topic);
+                throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
             }
 
             JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -144,7 +143,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
         job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString());
         job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString());
 
-        for (Integer partition : offsetStart.keySet()) {
+        for(Integer partition: offsetStart.keySet()) {
             job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString());
             job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 71f823f..c996c5f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -69,11 +69,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
 
         Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
-        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup,
-                kafkaProperties)) {
+        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
-            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(),
-                    "partition number mismatch with server side");
+            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
             for (int i = 0; i < partitionInfos.size(); i++) {
                 final PartitionInfo partition = partitionInfos.get(i);
                 int partitionId = partition.partition();
@@ -82,8 +80,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
                 }
 
                 if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId,
-                            startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
+                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
                     splits.add(split);
                 }
             }
@@ -92,8 +89,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
     }
 
     @Override
-    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
-            throws IOException, InterruptedException {
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
         return new KafkaInputRecordReader();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index c1bb625..c22c72f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -89,15 +89,13 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
 
         Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
 
-        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup,
-                kafkaProperties);
+        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
 
         earliestOffset = this.split.getOffsetStart();
         latestOffset = this.split.getOffsetEnd();
         TopicPartition topicPartition = new TopicPartition(topic, partition);
         consumer.assign(Arrays.asList(topicPartition));
-        log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}",
-                new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
+        log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
     }
 
     @Override
@@ -122,9 +120,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
             iterator = messages.iterator();
             if (!iterator.hasNext()) {
                 log.info("No more messages, stop");
-                throw new IOException(
-                        String.format("Unexpected ending of stream, expected ending offset %d, but end at %d",
-                                latestOffset, watermark));
+                throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
             }
         }
 
@@ -143,8 +139,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
         }
 
         log.error("Unexpected iterator end.");
-        throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d",
-                latestOffset, watermark));
+        throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
     }
 
     @Override
@@ -167,8 +162,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
 
     @Override
     public void close() throws IOException {
-        log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition,
-                numProcessedMessages);
+        log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages);
         consumer.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
index c8a0110..3261399 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
@@ -72,7 +72,7 @@ public class KafkaInputSplit extends InputSplit implements Writable {
 
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
-        return new String[] { brokers };
+        return new String[]{brokers};
     }
 
     public int getPartition() {
@@ -99,4 +99,4 @@ public class KafkaInputSplit extends InputSplit implements Writable {
     public String toString() {
         return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index d357d91..914fca2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -33,8 +34,6 @@ import org.apache.kylin.job.execution.ExecuteResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  */
 public class MergeOffsetStep extends AbstractExecutable {

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 56d3687..bd8f90e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -17,11 +17,7 @@
 */
 package org.apache.kylin.source.kafka.util;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -34,7 +30,10 @@ import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 
-import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  */
@@ -46,8 +45,7 @@ public class KafkaClient {
         return consumer;
     }
 
-    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup,
-            Properties properties) {
+    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
         Properties props = new Properties();
         if (properties != null) {
             for (Map.Entry entry : properties.entrySet()) {
@@ -99,8 +97,7 @@ public class KafkaClient {
     }
 
     public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
-        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getKafkaConfig(cubeInstance.getRootFactTable());
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
 
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();
@@ -116,9 +113,9 @@ public class KafkaClient {
         return startOffsets;
     }
 
+
     public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
-        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getKafkaConfig(cubeInstance.getRootFactTable());
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
 
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index fc04f62..4b91e03 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -48,12 +48,9 @@ public class KafkaSampleProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaSampleProducer.class);
     @SuppressWarnings("static-access")
-    private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true)
-            .withDescription("Kafka topic").create("topic");
-    private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true)
-            .withDescription("Kafka broker").create("broker");
-    private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false)
-            .withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
+    private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
+    private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
+    private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
 
     private static final ObjectMapper mapper = new ObjectMapper();
 
@@ -134,8 +131,7 @@ public class KafkaSampleProducer {
             user.put("age", rnd.nextInt(20) + 10);
             record.put("user", user);
             //send message
-            ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "",
-                    mapper.writeValueAsString(record));
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
             System.out.println("Sending 1 message: " + JsonUtil.writeValueAsString(record));
             producer.send(data);
             Thread.sleep(interval);

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
index 2339862..8dc840b 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -45,8 +45,7 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
     private static String[] userNeedColNames;
     private static final String jsonFilePath = "src/test/resources/message.json";
     private static ObjectMapper mapper;
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
-            SimpleType.construct(Object.class));
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
 
     @BeforeClass
     public static void setUp() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index cc94a35..208fdb6 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -52,17 +52,14 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
         KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
         assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
         assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
-        assertEquals("30000",
-                kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
+        assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
     }
 
     @Test
-    public void testLoadKafkaPropertiesAsHadoopJobConf()
-            throws IOException, ParserConfigurationException, SAXException {
+    public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException {
         KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
         Configuration conf = new Configuration(false);
-        conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())),
-                KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
+        conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
         assertEquals("30000", conf.get("session.timeout.ms"));
 
         Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 5b4126f..6580107 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -104,8 +104,7 @@ public class HBaseConnection {
             int coreThreads = config.getHBaseCoreConnectionThreads();
             long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
             LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
-            ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS,
-                    workQueue, //
+            ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
                     Threads.newDaemonThreadFactory("kylin-coproc-"));
             tpe.allowCoreThreadTimeOut(true);
 
@@ -145,8 +144,7 @@ public class HBaseConnection {
     private static Configuration newHBaseConfiguration(StorageURL url) {
         // using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath
         if (!"hbase".equals(url.getScheme()))
-            throw new IllegalArgumentException(
-                    "to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
+            throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
 
         Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
         addHBaseClusterNNHAConfiguration(conf);
@@ -165,7 +163,7 @@ public class HBaseConnection {
         if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
             conf.set("hbase.fs.tmp.dir", "/tmp");
         }
-
+        
         for (Entry<String, String> entry : url.getAllParameters().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
         }
@@ -266,8 +264,7 @@ public class HBaseConnection {
         return tableExists(HBaseConnection.get(hbaseUrl), tableName);
     }
 
-    public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families)
-            throws IOException {
+    public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families) throws IOException {
         createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
     }
 
@@ -280,7 +277,7 @@ public class HBaseConnection {
         TableName tableName = TableName.valueOf(table);
         DistributedLock lock = null;
         String lockPath = getLockPath(table);
-
+        
         try {
             if (tableExists(conn, table)) {
                 logger.debug("HTable '" + table + "' already exists");

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index f5f40a1..a2e0229 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -149,11 +149,9 @@ public class HBaseResourceStore extends ResourceStore {
     @Override
     public String getMetaStoreUUID() throws IOException {
         if (!exists(ResourceStore.METASTORE_UUID_TAG)) {
-            putResource(ResourceStore.METASTORE_UUID_TAG, new StringEntity(createMetaStoreUUID()), 0,
-                    StringEntity.serializer);
+            putResource(ResourceStore.METASTORE_UUID_TAG, new StringEntity(createMetaStoreUUID()), 0, StringEntity.serializer);
         }
-        StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.class,
-                StringEntity.serializer);
+        StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.class, StringEntity.serializer);
         return entity.toString();
     }
 
@@ -204,8 +202,7 @@ public class HBaseResourceStore extends ResourceStore {
     }
 
     @Override
-    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive)
-            throws IOException {
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
         FilterList filter = generateTimeFilterList(timeStart, timeEndExclusive);
         final List<RawResource> result = Lists.newArrayList();
         try {
@@ -229,13 +226,11 @@ public class HBaseResourceStore extends ResourceStore {
     private FilterList generateTimeFilterList(long timeStart, long timeEndExclusive) {
         FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
         if (timeStart != Long.MIN_VALUE) {
-            SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
-                    CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart));
+            SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart));
             filterList.addFilter(timeStartFilter);
         }
         if (timeEndExclusive != Long.MAX_VALUE) {
-            SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
-                    CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive));
+            SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive));
             filterList.addFilter(timeEndFilter);
         }
         return filterList.getFilters().size() == 0 ? null : filterList;
@@ -296,8 +291,7 @@ public class HBaseResourceStore extends ResourceStore {
     }
 
     @Override
-    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
-            throws IOException, IllegalStateException {
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
         Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
@@ -305,12 +299,10 @@ public class HBaseResourceStore extends ResourceStore {
             Put put = buildPut(resPath, newTS, row, content, table);
 
             boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
-            logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS
-                    + ", operation result: " + ok);
+            logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok);
             if (!ok) {
                 long real = getResourceTimestampImpl(resPath);
-                throw new IllegalStateException(
-                        "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
+                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
             }
 
             return newTS;
@@ -363,8 +355,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     }
 
-    private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp)
-            throws IOException {
+    private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
         byte[] rowkey = Bytes.toBytes(path);
 
         Get get = new Get(rowkey);
@@ -409,8 +400,7 @@ public class HBaseResourceStore extends ResourceStore {
     }
 
     private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
-        int kvSizeLimit = Integer
-                .parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
+        int kvSizeLimit = Integer.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
         if (content.length > kvSizeLimit) {
             writeLargeCellToHdfs(resPath, content, table);
             content = BytesUtil.EMPTY_BYTE_ARRAY;

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 0d44adc..fc6f878 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -50,16 +50,14 @@ public class HBaseStorage implements IStorage {
             CubeInstance cubeInstance = (CubeInstance) realization;
             String cubeStorageQuery;
             if (cubeInstance.getStorageType() == IStorageAware.ID_HBASE) {//v2 query engine cannot go with v1 storage now
-                throw new IllegalStateException(
-                        "Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
+                throw new IllegalStateException("Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
             } else {
                 cubeStorageQuery = v2CubeStorageQuery;//by default use v2
             }
 
             IStorageQuery ret;
             try {
-                ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class)
-                        .newInstance((CubeInstance) realization);
+                ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization);
             } catch (Exception e) {
                 throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e);
             }
@@ -72,13 +70,11 @@ public class HBaseStorage implements IStorage {
 
     private static TblColRef getPartitionCol(IRealization realization) {
         String modelName = realization.getModel().getName();
-        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getDataModelDesc(modelName);
+        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
         PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
         Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
         TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
-        Preconditions.checkArgument(partitionColRef != null,
-                "getPartitionDateColumnRef for " + realization + " is null");
+        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
         return partitionColRef;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
index 4d69925..25abdfb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
@@ -124,8 +124,7 @@ public class AggrKey implements Comparable<AggrKey> {
             return comp;
 
         for (int i = 0; i < groupByMaskSet.length; i++) {
-            comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]],
-                    o.data[o.offset + groupByMaskSet[i]]);
+            comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]], o.data[o.offset + groupByMaskSet[i]]);
             if (comp != 0)
                 return comp;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
index 386564a..2a85894 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
@@ -77,16 +77,13 @@ public abstract class AggregationCache {
         int size = aggBufMap.size();
         long memUsage = (40L + rowMemBytes) * size;
         if (memUsage > MEMORY_USAGE_CAP) {
-            throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * "
-                    + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
+            throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
         }
 
         //If less than 5% of max memory
         long avail = MemoryBudgetController.getSystemAvailBytes();
         if (avail < (MEMOERY_MAX_BYTES / 20)) {
-            throw new RuntimeException(
-                    "Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is "
-                            + avail + ". Max memory is " + MEMOERY_MAX_BYTES);
+            throw new RuntimeException("Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is " + avail + ". Max memory is " + MEMOERY_MAX_BYTES);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
index 2b3b91b..63e3bdb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
@@ -33,8 +33,7 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
  */
 public class CoprocessorFilter {
 
-    public static CoprocessorFilter fromFilter(final IDimensionEncodingMap dimEncMap, TupleFilter rootFilter,
-            FilterDecorator.FilterConstantsTreatment filterConstantsTreatment) {
+    public static CoprocessorFilter fromFilter(final IDimensionEncodingMap dimEncMap, TupleFilter rootFilter, FilterDecorator.FilterConstantsTreatment filterConstantsTreatment) {
         // translate constants into dictionary IDs via a serialize copy
         FilterDecorator filterDecorator = new FilterDecorator(dimEncMap, filterConstantsTreatment);
         byte[] bytes = TupleFilterSerializer.serialize(rootFilter, filterDecorator, DictCodeSystem.INSTANCE);
@@ -44,8 +43,7 @@ public class CoprocessorFilter {
     }
 
     public static byte[] serialize(CoprocessorFilter o) {
-        return (o.filter == null) ? BytesUtil.EMPTY_BYTE_ARRAY
-                : TupleFilterSerializer.serialize(o.filter, DictCodeSystem.INSTANCE);
+        return (o.filter == null) ? BytesUtil.EMPTY_BYTE_ARRAY : TupleFilterSerializer.serialize(o.filter, DictCodeSystem.INSTANCE);
     }
 
     public static CoprocessorFilter deserialize(byte[] filterBytes) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 65215f6..f6332f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -36,8 +36,7 @@ import org.apache.kylin.metadata.model.TblColRef;
  */
 public class CoprocessorProjector {
 
-    public static CoprocessorProjector makeForObserver(final CubeSegment cubeSegment, final Cuboid cuboid,
-            final Collection<TblColRef> dimensionColumns) {
+    public static CoprocessorProjector makeForObserver(final CubeSegment cubeSegment, final Cuboid cuboid, final Collection<TblColRef> dimensionColumns) {
 
         RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
             @Override
@@ -46,8 +45,7 @@ public class CoprocessorProjector {
             }
 
             @Override
-            protected void fillColumnValue(TblColRef column, int columnLen, String valueStr, byte[] outputValue,
-                    int outputValueOffset) {
+            protected void fillColumnValue(TblColRef column, int columnLen, String valueStr, byte[] outputValue, int outputValueOffset) {
                 byte bits = dimensionColumns.contains(column) ? (byte) 0xff : 0x00;
                 Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, bits);
             }
@@ -56,6 +54,7 @@ public class CoprocessorProjector {
         byte[] mask = rowKeyMaskEncoder.encode(new String[cuboid.getColumns().size()]);
         return new CoprocessorProjector(mask, dimensionColumns.size() != 0);
     }
+  
 
     public static byte[] serialize(CoprocessorProjector o) {
         ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);