You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:22:54 UTC
[06/67] [abbrv] kylin git commit: Revert "reformat code"
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
index 0d8daa4..e8a93bd 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
@@ -88,23 +88,19 @@ public class CLIHiveClient implements IHiveClient {
List<HiveTableMeta.HiveTableColumnMeta> allColumns = Lists.newArrayList();
List<HiveTableMeta.HiveTableColumnMeta> partitionColumns = Lists.newArrayList();
for (FieldSchema fieldSchema : allFields) {
- allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(),
- fieldSchema.getComment()));
+ allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment()));
}
if (partitionFields != null && partitionFields.size() > 0) {
for (FieldSchema fieldSchema : partitionFields) {
- partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(),
- fieldSchema.getComment()));
+ partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment()));
}
}
builder.setAllColumns(allColumns);
builder.setPartitionColumns(partitionColumns);
builder.setSdLocation(table.getSd().getLocation());
- builder.setFileSize(
- getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE));
- builder.setFileNum(
- getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES));
+ builder.setFileSize(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE));
+ builder.setFileNum(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES));
builder.setIsNative(!MetaStoreUtils.isNonNativeTable(table));
builder.setTableName(tableName);
builder.setSdInputFormat(table.getSd().getInputFormat());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index c907b44..15d4456 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -137,8 +137,7 @@ public class HiveMRInput implements IMRInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
- .getConfig();
+ final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
JobEngineConfig conf = new JobEngineConfig(cubeConfig);
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
@@ -169,8 +168,7 @@ public class HiveMRInput implements IMRInput {
return step;
}
- private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
- String jobWorkingDir) {
+ private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -197,25 +195,21 @@ public class HiveMRInput implements IMRInput {
if (lookUpTableDesc.isView()) {
StringBuilder createIntermediateTableHql = new StringBuilder();
createIntermediateTableHql.append("DROP TABLE IF EXISTS " + intermediate + ";\n");
- createIntermediateTableHql
- .append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n");
+ createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n");
createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n");
- createIntermediateTableHql
- .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n");
+ createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n");
hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
}
}
- hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0,
- hiveViewIntermediateTables.length() - 1);
+ hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length() - 1);
step.setCmd(hiveCmdBuilder.build());
return step;
}
- private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
- String cubeName) {
+ private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) {
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
@@ -302,12 +296,10 @@ public class HiveMRInput implements IMRInput {
logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
if (rowCount == 0) {
if (!config.isEmptySegmentAllowed()) {
- stepLogger.log("Detect upstream hive table is empty, "
- + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
+ stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
} else {
- return new ExecuteResult(ExecuteResult.State.SUCCEED,
- "Row count is 0, no need to redistribute");
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute");
}
}
@@ -384,8 +376,7 @@ public class HiveMRInput implements IMRInput {
config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
output.append("Hive table " + hiveTable + " is dropped. \n");
rmdirOnHDFS(getExternalDataPath());
- output.append(
- "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+ output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
}
return output.toString();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
index 5fff000..14ed1f9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.mr.DFSFileTable;
@@ -77,7 +78,7 @@ public class HiveTable implements IReadableTable {
throw new IOException(e);
}
}
-
+
@Override
public boolean exists() {
return true;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
index 089850a..fa9eb29 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
@@ -34,8 +34,7 @@ class HiveTableMeta {
@Override
public String toString() {
- return "HiveTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='"
- + comment + '\'' + '}';
+ return "HiveTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='" + comment + '\'' + '}';
}
}
@@ -53,9 +52,7 @@ class HiveTableMeta {
List<HiveTableColumnMeta> allColumns;
List<HiveTableColumnMeta> partitionColumns;
- public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner,
- String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount,
- boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) {
+ public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner, String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) {
this.tableName = tableName;
this.sdLocation = sdLocation;
this.sdInputFormat = sdInputFormat;
@@ -73,10 +70,6 @@ class HiveTableMeta {
@Override
public String toString() {
- return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\''
- + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\''
- + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime
- + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns="
- + allColumns + ", partitionColumns=" + partitionColumns + '}';
+ return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns + ", partitionColumns=" + partitionColumns + '}';
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
index 6fedd8b..073ded5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
@@ -106,7 +106,6 @@ public class HiveTableMetaBuilder {
}
public HiveTableMeta createHiveTableMeta() {
- return new HiveTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, tableType, lastAccessTime,
- fileSize, fileNum, skipHeaderLineCount, isNative, allColumns, partitionColumns);
+ return new HiveTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, tableType, lastAccessTime, fileSize, fileNum, skipHeaderLineCount, isNative, allColumns, partitionColumns);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 48e0ee3..75f322f 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -143,8 +143,7 @@ public class HiveTableReader implements TableReader {
return "hive table reader for: " + dbName + "." + tableName;
}
- private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV)
- throws Exception {
+ private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
HiveConf hiveConf = new HiveConf(HiveTableReader.class);
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
@@ -157,8 +156,7 @@ public class HiveTableReader implements TableReader {
if (partitionKV == null || partitionKV.size() == 0) {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
} else {
- entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV)
- .build();
+ entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
}
HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
index 9db65042..22bea46 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
@@ -18,11 +18,11 @@
package org.apache.kylin.source.hive;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-
public interface IHiveClient {
void executeHQL(String hql) throws CommandNeedRetryException, IOException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 52730bd..ffd54db 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -71,7 +71,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
ColumnDesc[] columns = tableDesc.getColumns();
Collection<String[]> valuesCollection = tableInputFormat.parseMapperInput(value);
- for (String[] values : valuesCollection) {
+ for (String[] values: valuesCollection) {
for (int m = 0; m < columns.length; m++) {
String field = columns[m].getName();
String fieldValue = values[m];
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 3724ef7..0648960 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -49,8 +49,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
}
@Override
- public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context)
- throws IOException, InterruptedException {
+ public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
int skey = key.get();
for (BytesWritable v : values) {
ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 7179a66..f439ccb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -50,8 +50,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
@SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
- .withDescription("The hive table name").create("table");
+ protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
public HiveColumnCardinalityJob() {
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
index 5f48523..246822c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
@@ -52,8 +52,7 @@ public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob {
public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job";
@SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
- .withDescription("The hive table name").create("table");
+ protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
private String table;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
index 21dac70..4bcd572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -18,10 +18,10 @@
package org.apache.kylin.source.kafka;
-import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
+import java.util.Map;
+
/**
*/
public class DefaultTimeParser extends AbstractTimeParser {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 650f57e..50295c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -73,8 +73,7 @@ public class KafkaConfigManager {
}
@Override
- public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
- throws IOException {
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
if (event == Event.DROP)
removeKafkaConfigLocal(cacheKey);
else
@@ -218,13 +217,11 @@ public class KafkaConfigManager {
private void reloadAllKafkaConfig() throws IOException {
ResourceStore store = getStore();
- logger.info("Reloading Kafka Metadata from folder "
- + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
+ logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
kafkaMap.clear();
- List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT,
- MetadataConstants.FILE_SURFIX);
+ List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
for (String path : paths) {
KafkaConfig kafkaConfig;
try {
@@ -234,8 +231,7 @@ public class KafkaConfigManager {
continue;
}
if (path.equals(kafkaConfig.getResourcePath()) == false) {
- logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at "
- + kafkaConfig.getResourcePath());
+ logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath());
continue;
}
if (kafkaMap.containsKey(kafkaConfig.getName())) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 5815d53..3323afb 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -78,14 +78,13 @@ public class KafkaMRInput implements IMRInput {
public IMRTableInputFormat getTableInputFormat(TableDesc table) {
KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
- List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()),
- new Function<ColumnDesc, TblColRef>() {
- @Nullable
- @Override
- public TblColRef apply(ColumnDesc input) {
- return input.getRef();
- }
- });
+ List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
+ @Nullable
+ @Override
+ public TblColRef apply(ColumnDesc input) {
+ return input.getRef();
+ }
+ });
return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
}
@@ -100,13 +99,11 @@ public class KafkaMRInput implements IMRInput {
private StreamingParser streamingParser;
private final JobEngineConfig conf;
- public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig,
- JobEngineConfig conf) {
+ public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
this.cubeSegment = cubeSegment;
this.conf = conf;
try {
- streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(),
- kafkaConfig.getParserProperties(), columns);
+ streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException(e);
}
@@ -117,8 +114,7 @@ public class KafkaMRInput implements IMRInput {
job.setInputFormatClass(SequenceFileInputFormat.class);
String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
- String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
- JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
try {
FileInputFormat.addInputPath(job, new Path(inputPath));
} catch (IOException e) {
@@ -130,10 +126,10 @@ public class KafkaMRInput implements IMRInput {
public Collection<String[]> parseMapperInput(Object mapperInput) {
Text text = (Text) mapperInput;
ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength());
- List<StreamingMessageRow> streamingMessageRowList = streamingParser.parse(buffer);
+ List<StreamingMessageRow> streamingMessageRowList = streamingParser.parse(buffer);
List<String[]> parsedDataCollection = new ArrayList<>();
- for (StreamingMessageRow row : streamingMessageRowList) {
+ for (StreamingMessageRow row: streamingMessageRowList) {
parsedDataCollection.add(row.getData().toArray(new String[row.getData().size()]));
}
@@ -162,19 +158,16 @@ public class KafkaMRInput implements IMRInput {
MapReduceExecutable result = new MapReduceExecutable();
IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
- outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
- JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
result.setName("Save data from Kafka");
result.setMapReduceJobClass(KafkaFlatTableJob.class);
JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
StringBuilder cmd = new StringBuilder();
jobBuilderSupport.appendMapReduceParameters(cmd);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
- seg.getRealization().getName());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
- "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
result.setMapReduceParams(cmd.toString());
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index fc0d50d..52d2e6f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -32,8 +32,8 @@ import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
-import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -71,25 +71,20 @@ public class KafkaSource implements ISource {
if (result.getStartOffset() == 0) {
final CubeSegment last = cube.getLastSegment();
if (last != null) {
- logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: "
- + last.getSourcePartitionOffsetEnd());
+ logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
// from last seg's end position
result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
- } else if (cube.getDescriptor().getPartitionOffsetStart() != null
- && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
- logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: "
- + cube.getDescriptor().getPartitionOffsetStart());
+ } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+ logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
} else {
// from the topic's earliest offset;
- logger.debug(
- "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
+ logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
}
}
- final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getKafkaConfig(cube.getRootFactTable());
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
@@ -113,9 +108,7 @@ public class KafkaSource implements ISource {
for (Integer partitionId : latestOffsets.keySet()) {
if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
- throw new IllegalArgumentException("Partition " + partitionId + " end offset ("
- + latestOffsets.get(partitionId) + ") is smaller than start offset ( "
- + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+ throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
}
} else {
throw new IllegalStateException("New partition added in between, retry.");
@@ -133,8 +126,7 @@ public class KafkaSource implements ISource {
}
if (totalStartOffset > totalEndOffset) {
- throw new IllegalArgumentException(
- "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+ throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
}
if (totalStartOffset == totalEndOffset) {
@@ -159,8 +151,7 @@ public class KafkaSource implements ISource {
if (startOffset > 0) {
if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
- throw new IllegalArgumentException(
- "When 'startOffset' is > 0, need provide each partition's start offset");
+ throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
}
long totalOffset = 0;
@@ -169,15 +160,13 @@ public class KafkaSource implements ISource {
}
if (totalOffset != startOffset) {
- throw new IllegalArgumentException(
- "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+ throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
}
}
if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
- throw new IllegalArgumentException(
- "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+ throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
}
long totalOffset = 0;
@@ -186,8 +175,7 @@ public class KafkaSource implements ISource {
}
if (totalOffset != endOffset) {
- throw new IllegalArgumentException(
- "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+ throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 1459c2d..2e3c11c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -19,20 +19,20 @@
package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.StreamingMessageRow;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
/**
* By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params
*/
@@ -68,8 +68,7 @@ public abstract class StreamingParser {
abstract public boolean filter(StreamingMessageRow streamingMessageRow);
- public static StreamingParser getStreamingParser(String parserName, String parserProperties,
- List<TblColRef> columns) throws ReflectiveOperationException {
+ public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
if (!StringUtils.isEmpty(parserName)) {
logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties);
Class clazz = Class.forName(parserName);
@@ -77,8 +76,7 @@ public abstract class StreamingParser {
Constructor constructor = clazz.getConstructor(List.class, Map.class);
return (StreamingParser) constructor.newInstance(columns, properties);
} else {
- throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties "
- + parserProperties + ".");
+ throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index beed6f7..de167b4 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -21,14 +21,15 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.HashMap;
import java.util.TreeMap;
+import java.util.Collections;
+import java.util.Arrays;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.ByteBufferBackedInputStream;
import org.apache.kylin.common.util.StreamingMessageRow;
@@ -36,7 +37,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
@@ -68,8 +68,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private final Map<String, String[]> nameMap = new HashMap<>();
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
- SimpleType.construct(Object.class));
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
private AbstractTimeParser streamTimeParser;
@@ -89,12 +88,10 @@ public final class TimedJsonStreamParser extends StreamingParser {
Constructor constructor = clazz.getConstructor(Map.class);
streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
} catch (Exception e) {
- throw new IllegalStateException(
- "Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
}
} else {
- throw new IllegalStateException(
- "Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
}
mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@@ -119,8 +116,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
}
- StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t,
- Collections.<String, Object> emptyMap());
+ StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t, Collections.<String, Object>emptyMap());
List<StreamingMessageRow> messageRowList = new ArrayList<StreamingMessageRow>();
messageRowList.add(streamingMessageRow);
return messageRowList;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
index 1e75763..fc3bba0 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java
@@ -18,15 +18,15 @@
package org.apache.kylin.source.kafka.config;
-import java.io.Serializable;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+
/**
*/
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class BrokerConfig implements Serializable {
+public class BrokerConfig implements Serializable{
@JsonProperty("id")
private int id;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 44be966..afe888f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -39,8 +39,7 @@ import kafka.cluster.Broker;
*/
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class KafkaClusterConfig extends RootPersistentEntity {
- public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(
- KafkaClusterConfig.class);
+ public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(KafkaClusterConfig.class);
@JsonProperty("brokers")
private List<BrokerConfig> brokerConfigs;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index b073921..cc32ed9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -57,8 +57,7 @@ public class KafkaConsumerProperties {
KafkaConsumerProperties config = new KafkaConsumerProperties();
config.properties = config.loadKafkaConsumerProperties();
- logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : "
- + System.identityHashCode(config));
+ logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
ENV_INSTANCE = config;
} catch (IllegalArgumentException e) {
throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e);
@@ -82,14 +81,8 @@ public class KafkaConsumerProperties {
configNames = ConsumerConfig.configNames();
} catch (Error e) {
// the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException which is an Error, not Exception
- String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms,"
- + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type,"
- + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password,"
- + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms,"
- + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes,"
- + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
- + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset")
- .split(",");
+ String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
+ + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(",");
configNames.addAll(Arrays.asList(configNamesArray));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b3a0f19..11466e5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -97,8 +97,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
String topic = kafkaConfig.getTopic();
if (brokers == null || brokers.length() == 0 || topic == null) {
- throw new IllegalArgumentException(
- "Invalid Kafka information, brokers " + brokers + ", topic " + topic);
+ throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
}
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -144,7 +143,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString());
job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString());
- for (Integer partition : offsetStart.keySet()) {
+ for(Integer partition: offsetStart.keySet()) {
job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString());
job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 71f823f..c996c5f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -69,11 +69,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
final List<InputSplit> splits = new ArrayList<InputSplit>();
- try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup,
- kafkaProperties)) {
+ try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
- Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(),
- "partition number mismatch with server side");
+ Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
for (int i = 0; i < partitionInfos.size(); i++) {
final PartitionInfo partition = partitionInfos.get(i);
int partitionId = partition.partition();
@@ -82,8 +80,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
- InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId,
- startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
+ InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
splits.add(split);
}
}
@@ -92,8 +89,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
@Override
- public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
+ public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
return new KafkaInputRecordReader();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index c1bb625..c22c72f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -89,15 +89,13 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
- consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup,
- kafkaProperties);
+ consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
earliestOffset = this.split.getOffsetStart();
latestOffset = this.split.getOffsetEnd();
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Arrays.asList(topicPartition));
- log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}",
- new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
+ log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
}
@Override
@@ -122,9 +120,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
iterator = messages.iterator();
if (!iterator.hasNext()) {
log.info("No more messages, stop");
- throw new IOException(
- String.format("Unexpected ending of stream, expected ending offset %d, but end at %d",
- latestOffset, watermark));
+ throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
}
}
@@ -143,8 +139,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
}
log.error("Unexpected iterator end.");
- throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d",
- latestOffset, watermark));
+ throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
}
@Override
@@ -167,8 +162,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
@Override
public void close() throws IOException {
- log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition,
- numProcessedMessages);
+ log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages);
consumer.close();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
index c8a0110..3261399 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
@@ -72,7 +72,7 @@ public class KafkaInputSplit extends InputSplit implements Writable {
@Override
public String[] getLocations() throws IOException, InterruptedException {
- return new String[] { brokers };
+ return new String[]{brokers};
}
public int getPartition() {
@@ -99,4 +99,4 @@ public class KafkaInputSplit extends InputSplit implements Writable {
public String toString() {
return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index d357d91..914fca2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import com.google.common.base.Preconditions;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -33,8 +34,6 @@ import org.apache.kylin.job.execution.ExecuteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
*/
public class MergeOffsetStep extends AbstractExecutable {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 56d3687..bd8f90e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -17,11 +17,7 @@
*/
package org.apache.kylin.source.kafka.util;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -34,7 +30,10 @@ import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
-import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
/**
*/
@@ -46,8 +45,7 @@ public class KafkaClient {
return consumer;
}
- private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup,
- Properties properties) {
+ private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
Properties props = new Properties();
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
@@ -99,8 +97,7 @@ public class KafkaClient {
}
public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
- final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getKafkaConfig(cubeInstance.getRootFactTable());
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
@@ -116,9 +113,9 @@ public class KafkaClient {
return startOffsets;
}
+
public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
- final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getKafkaConfig(cubeInstance.getRootFactTable());
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index fc04f62..4b91e03 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -48,12 +48,9 @@ public class KafkaSampleProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaSampleProducer.class);
@SuppressWarnings("static-access")
- private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true)
- .withDescription("Kafka topic").create("topic");
- private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true)
- .withDescription("Kafka broker").create("broker");
- private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false)
- .withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
+ private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
+ private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
+ private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
private static final ObjectMapper mapper = new ObjectMapper();
@@ -134,8 +131,7 @@ public class KafkaSampleProducer {
user.put("age", rnd.nextInt(20) + 10);
record.put("user", user);
//send message
- ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "",
- mapper.writeValueAsString(record));
+ ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
System.out.println("Sending 1 message: " + JsonUtil.writeValueAsString(record));
producer.send(data);
Thread.sleep(interval);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
index 2339862..8dc840b 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -45,8 +45,7 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
private static String[] userNeedColNames;
private static final String jsonFilePath = "src/test/resources/message.json";
private static ObjectMapper mapper;
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
- SimpleType.construct(Object.class));
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
@BeforeClass
public static void setUp() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index cc94a35..208fdb6 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -52,17 +52,14 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
- assertEquals("30000",
- kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
+ assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
}
@Test
- public void testLoadKafkaPropertiesAsHadoopJobConf()
- throws IOException, ParserConfigurationException, SAXException {
+ public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException {
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
Configuration conf = new Configuration(false);
- conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())),
- KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
+ conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
assertEquals("30000", conf.get("session.timeout.ms"));
Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 5b4126f..6580107 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -104,8 +104,7 @@ public class HBaseConnection {
int coreThreads = config.getHBaseCoreConnectionThreads();
long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS,
- workQueue, //
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
Threads.newDaemonThreadFactory("kylin-coproc-"));
tpe.allowCoreThreadTimeOut(true);
@@ -145,8 +144,7 @@ public class HBaseConnection {
private static Configuration newHBaseConfiguration(StorageURL url) {
// using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath
if (!"hbase".equals(url.getScheme()))
- throw new IllegalArgumentException(
- "to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
+ throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
addHBaseClusterNNHAConfiguration(conf);
@@ -165,7 +163,7 @@ public class HBaseConnection {
if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
conf.set("hbase.fs.tmp.dir", "/tmp");
}
-
+
for (Entry<String, String> entry : url.getAllParameters().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
@@ -266,8 +264,7 @@ public class HBaseConnection {
return tableExists(HBaseConnection.get(hbaseUrl), tableName);
}
- public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families)
- throws IOException {
+ public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families) throws IOException {
createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
}
@@ -280,7 +277,7 @@ public class HBaseConnection {
TableName tableName = TableName.valueOf(table);
DistributedLock lock = null;
String lockPath = getLockPath(table);
-
+
try {
if (tableExists(conn, table)) {
logger.debug("HTable '" + table + "' already exists");
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index f5f40a1..a2e0229 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -149,11 +149,9 @@ public class HBaseResourceStore extends ResourceStore {
@Override
public String getMetaStoreUUID() throws IOException {
if (!exists(ResourceStore.METASTORE_UUID_TAG)) {
- putResource(ResourceStore.METASTORE_UUID_TAG, new StringEntity(createMetaStoreUUID()), 0,
- StringEntity.serializer);
+ putResource(ResourceStore.METASTORE_UUID_TAG, new StringEntity(createMetaStoreUUID()), 0, StringEntity.serializer);
}
- StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.class,
- StringEntity.serializer);
+ StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.class, StringEntity.serializer);
return entity.toString();
}
@@ -204,8 +202,7 @@ public class HBaseResourceStore extends ResourceStore {
}
@Override
- protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive)
- throws IOException {
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
FilterList filter = generateTimeFilterList(timeStart, timeEndExclusive);
final List<RawResource> result = Lists.newArrayList();
try {
@@ -229,13 +226,11 @@ public class HBaseResourceStore extends ResourceStore {
private FilterList generateTimeFilterList(long timeStart, long timeEndExclusive) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (timeStart != Long.MIN_VALUE) {
- SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
- CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart));
+ SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart));
filterList.addFilter(timeStartFilter);
}
if (timeEndExclusive != Long.MAX_VALUE) {
- SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
- CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive));
+ SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive));
filterList.addFilter(timeEndFilter);
}
return filterList.getFilters().size() == 0 ? null : filterList;
@@ -296,8 +291,7 @@ public class HBaseResourceStore extends ResourceStore {
}
@Override
- protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
- throws IOException, IllegalStateException {
+ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
@@ -305,12 +299,10 @@ public class HBaseResourceStore extends ResourceStore {
Put put = buildPut(resPath, newTS, row, content, table);
boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
- logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS
- + ", operation result: " + ok);
+ logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok);
if (!ok) {
long real = getResourceTimestampImpl(resPath);
- throw new IllegalStateException(
- "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
+ throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
}
return newTS;
@@ -363,8 +355,7 @@ public class HBaseResourceStore extends ResourceStore {
}
- private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp)
- throws IOException {
+ private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
byte[] rowkey = Bytes.toBytes(path);
Get get = new Get(rowkey);
@@ -409,8 +400,7 @@ public class HBaseResourceStore extends ResourceStore {
}
private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
- int kvSizeLimit = Integer
- .parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
+ int kvSizeLimit = Integer.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
if (content.length > kvSizeLimit) {
writeLargeCellToHdfs(resPath, content, table);
content = BytesUtil.EMPTY_BYTE_ARRAY;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 0d44adc..fc6f878 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -50,16 +50,14 @@ public class HBaseStorage implements IStorage {
CubeInstance cubeInstance = (CubeInstance) realization;
String cubeStorageQuery;
if (cubeInstance.getStorageType() == IStorageAware.ID_HBASE) {//v2 query engine cannot go with v1 storage now
- throw new IllegalStateException(
- "Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
+ throw new IllegalStateException("Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
} else {
cubeStorageQuery = v2CubeStorageQuery;//by default use v2
}
IStorageQuery ret;
try {
- ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class)
- .newInstance((CubeInstance) realization);
+ ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e);
}
@@ -72,13 +70,11 @@ public class HBaseStorage implements IStorage {
private static TblColRef getPartitionCol(IRealization realization) {
String modelName = realization.getModel().getName();
- DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getDataModelDesc(modelName);
+ DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
- Preconditions.checkArgument(partitionColRef != null,
- "getPartitionDateColumnRef for " + realization + " is null");
+ Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
return partitionColRef;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
index 4d69925..25abdfb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java
@@ -124,8 +124,7 @@ public class AggrKey implements Comparable<AggrKey> {
return comp;
for (int i = 0; i < groupByMaskSet.length; i++) {
- comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]],
- o.data[o.offset + groupByMaskSet[i]]);
+ comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]], o.data[o.offset + groupByMaskSet[i]]);
if (comp != 0)
return comp;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
index 386564a..2a85894 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java
@@ -77,16 +77,13 @@ public abstract class AggregationCache {
int size = aggBufMap.size();
long memUsage = (40L + rowMemBytes) * size;
if (memUsage > MEMORY_USAGE_CAP) {
- throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * "
- + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
+ throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor.");
}
//If less than 5% of max memory
long avail = MemoryBudgetController.getSystemAvailBytes();
if (avail < (MEMOERY_MAX_BYTES / 20)) {
- throw new RuntimeException(
- "Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is "
- + avail + ". Max memory is " + MEMOERY_MAX_BYTES);
+ throw new RuntimeException("Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is " + avail + ". Max memory is " + MEMOERY_MAX_BYTES);
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
index 2b3b91b..63e3bdb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java
@@ -33,8 +33,7 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
*/
public class CoprocessorFilter {
- public static CoprocessorFilter fromFilter(final IDimensionEncodingMap dimEncMap, TupleFilter rootFilter,
- FilterDecorator.FilterConstantsTreatment filterConstantsTreatment) {
+ public static CoprocessorFilter fromFilter(final IDimensionEncodingMap dimEncMap, TupleFilter rootFilter, FilterDecorator.FilterConstantsTreatment filterConstantsTreatment) {
// translate constants into dictionary IDs via a serialize copy
FilterDecorator filterDecorator = new FilterDecorator(dimEncMap, filterConstantsTreatment);
byte[] bytes = TupleFilterSerializer.serialize(rootFilter, filterDecorator, DictCodeSystem.INSTANCE);
@@ -44,8 +43,7 @@ public class CoprocessorFilter {
}
public static byte[] serialize(CoprocessorFilter o) {
- return (o.filter == null) ? BytesUtil.EMPTY_BYTE_ARRAY
- : TupleFilterSerializer.serialize(o.filter, DictCodeSystem.INSTANCE);
+ return (o.filter == null) ? BytesUtil.EMPTY_BYTE_ARRAY : TupleFilterSerializer.serialize(o.filter, DictCodeSystem.INSTANCE);
}
public static CoprocessorFilter deserialize(byte[] filterBytes) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 65215f6..f6332f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -36,8 +36,7 @@ import org.apache.kylin.metadata.model.TblColRef;
*/
public class CoprocessorProjector {
- public static CoprocessorProjector makeForObserver(final CubeSegment cubeSegment, final Cuboid cuboid,
- final Collection<TblColRef> dimensionColumns) {
+ public static CoprocessorProjector makeForObserver(final CubeSegment cubeSegment, final Cuboid cuboid, final Collection<TblColRef> dimensionColumns) {
RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
@Override
@@ -46,8 +45,7 @@ public class CoprocessorProjector {
}
@Override
- protected void fillColumnValue(TblColRef column, int columnLen, String valueStr, byte[] outputValue,
- int outputValueOffset) {
+ protected void fillColumnValue(TblColRef column, int columnLen, String valueStr, byte[] outputValue, int outputValueOffset) {
byte bits = dimensionColumns.contains(column) ? (byte) 0xff : 0x00;
Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, bits);
}
@@ -56,6 +54,7 @@ public class CoprocessorProjector {
byte[] mask = rowKeyMaskEncoder.encode(new String[cuboid.getColumns().size()]);
return new CoprocessorProjector(mask, dimensionColumns.size() != 0);
}
+
public static byte[] serialize(CoprocessorProjector o) {
ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);