You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/01 01:12:33 UTC
[kylin] 01/01: KYLIN-3378 Kafka join with hive
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch KYLIN-3378
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d37c236ee9865846fed4909eec983f7644f6fd35
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Fri May 25 18:25:48 2018 +0800
KYLIN-3378 Kafka join with hive
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../org/apache/kylin/common/KylinConfigBase.java | 39 ++++-
.../org/apache/kylin/common/util/BasicTest.java | 3 +-
.../kylin/cube/model/CubeJoinedFlatTableDesc.java | 24 ++-
.../cube/model/CubeJoinedFlatTableEnrich.java | 10 ++
.../model/validation/rule/StreamingCubeRule.java | 11 --
.../java/org/apache/kylin/job/JoinedFlatTable.java | 44 ++---
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/metadata/model/IJoinedFlatTableDesc.java | 4 +
.../org/apache/kylin/source/hive/HiveMRInput.java | 68 +++++---
source-kafka/pom.xml | 4 +
.../apache/kylin/source/kafka/KafkaMRInput.java | 194 ++++++++++++++-------
.../org/apache/kylin/source/kafka/KafkaSource.java | 5 +-
.../kylin/source/kafka/config/KafkaConfig.java | 12 ++
.../source/kafka/hadoop/KafkaFlatTableJob.java | 7 +-
.../source/kafka/hadoop/KafkaFlatTableMapper.java | 24 ++-
.../source/kafka/hadoop/KafkaInputFormat.java | 18 +-
.../apache/kylin/source/kafka/SpiltNumTest.java | 163 +++++++++++++++++
17 files changed, 490 insertions(+), 141 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 689d08f..cdb3755 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -225,7 +225,11 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.env", "DEV");
}
- private String cachedHdfsWorkingDirectory;
+ static public String cachedHdfsWorkingDirectory;//////
+
+ public void setHdfsWorkingDirectory(String cachedHdfsWorkingDirectory){///////
+ this.cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory;
+ }
public String getHdfsWorkingDirectory() {
if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +264,39 @@ abstract public class KylinConfigBase implements Serializable {
return cachedHdfsWorkingDirectory;
}
+ public String getHdfsWorkingDirectory(String cachedHdfsWorkingDirectory) {//
+ if (cachedHdfsWorkingDirectory != null)
+ return cachedHdfsWorkingDirectory;
+
+ String root = getOptional("kylin.env.hdfs-working-dir", "/kylin");
+
+ Path path = new Path(root);
+ if (!path.isAbsolute())
+ throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
+
+ // make sure path is qualified
+ try {
+ FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+ path = fs.makeQualified(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // append metadata-url prefix
+ root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
+
+ if (!root.endsWith("/"))
+ root += "/";
+
+ cachedHdfsWorkingDirectory = root;
+ if (cachedHdfsWorkingDirectory.startsWith("file:")) {
+ cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("file:", "file://");
+ } else if (cachedHdfsWorkingDirectory.startsWith("maprfs:")) {
+ cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("maprfs:", "maprfs://");
+ }
+ return cachedHdfsWorkingDirectory;
+ }
+
public String getZookeeperBasePath() {
return getOptional("kylin.env.zookeeper-base-path", "/kylin");
}
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1c1e389..6ae238b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -231,7 +231,8 @@ public class BasicTest {
String[] origin = new String[] {"ab,c", "cd|e"};
- String delimiter = "\u001F"; // "\t";
+ // test with sequence file default delimiter
+ String delimiter = "\01"; //"\u001F"; "\t";
String concated = StringUtils.join(Arrays.asList(origin), delimiter);
String[] newValues = concated.split(delimiter);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index d50a5af..70ad13e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -42,8 +42,8 @@ import com.google.common.collect.Maps;
@SuppressWarnings("serial")
public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable {
- protected final String tableName;
- protected final CubeDesc cubeDesc;
+ protected String tableName;/////
+ protected final CubeDesc cubeDesc;///
protected final CubeSegment cubeSegment;
protected final boolean includingDerived;
@@ -135,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
}
}
+ @Override
+ public List<TblColRef> getFactColumns() {
+ final List<TblColRef> factColumns = Lists.newArrayList();
+ for (TblColRef col : this.getAllColumns()) {
+ if (col.getTableRef().equals(getDataModel().getRootFactTable())) {
+ // only fetch the columns from fact table
+ factColumns.add(col);
+ }
+ }
+ return factColumns;
+ }
+
// sanity check the input record (in bytes) matches what's expected
public void sanityCheck(BytesSplitter bytesSplitter) {
if (columnCount != bytesSplitter.getBufferSize()) {
@@ -171,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
@Override
public SegmentRange getSegRange() {
+ if (cubeSegment.isOffsetCube()) {
+ return null;
+ }
return cubeSegment.getSegRange();
}
@@ -185,6 +200,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
}
@Override
+ public boolean useAlias() {
+ return true;
+ }
+
+ @Override
public TblColRef getClusterBy() {
return cubeDesc.getClusteredByColumn();
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 73da802..f09478e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -105,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
}
@Override
+ public List<TblColRef> getFactColumns() {
+ return flatDesc.getFactColumns();
+ }
+
+ @Override
public DataModelDesc getDataModel() {
return flatDesc.getDataModel();
}
@@ -130,6 +135,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
}
@Override
+ public boolean useAlias() {
+ return flatDesc.useAlias();
+ }
+
+ @Override
public TblColRef getClusterBy() {
return flatDesc.getClusterBy();
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
index 4438706..647f4c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule;
import org.apache.kylin.cube.model.validation.ResultLevel;
import org.apache.kylin.cube.model.validation.ValidateContext;
import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.TblColRef;
@@ -49,16 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
return;
}
- if (model.getLookupTables().size() > 0) {
- context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed.");
- return;
- }
-
- if (cube.getEngineType() == IEngineAware.ID_SPARK) {
- context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead.");
- return;
- }
-
if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) {
context.addResult(ResultLevel.ERROR, "Must define a partition column.");
return;
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 528bcf0..0769dcf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -44,6 +44,8 @@ import org.apache.kylin.metadata.model.TblColRef;
public class JoinedFlatTable {
+ public static final String TEXTFILE = "TEXTFILE";
+
public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
return storageDfsDir + "/" + flatDesc.getTableName();
}
@@ -61,13 +63,13 @@ public class JoinedFlatTable {
}
public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
- String storageFormat) {
+ String storageFormat) {
String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter();
return generateCreateTableStatement(flatDesc, storageDfsDir, storageFormat, fieldDelimiter);
}
public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
- String storageFormat, String fieldDelimiter) {
+ String storageFormat, String fieldDelimiter) {
StringBuilder ddl = new StringBuilder();
ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
@@ -78,10 +80,10 @@ public class JoinedFlatTable {
if (i > 0) {
ddl.append(",");
}
- ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n");
+ ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n");
}
ddl.append(")" + "\n");
- if ("TEXTFILE".equals(storageFormat)) {
+ if (TEXTFILE.equals(storageFormat)) {
ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n");
}
ddl.append("STORED AS " + storageFormat + "\n");
@@ -96,6 +98,12 @@ public class JoinedFlatTable {
return ddl.toString();
}
+ public static String generateDropTableStatement1(IJoinedFlatTableDesc flatDesc) {
+ StringBuilder ddl = new StringBuilder();
+ ddl.append("DROP TABLE IF EXISTS " + flatDesc.getDataModel().getRootFactTableName() + ";").append("\n");
+ return ddl.toString();
+ }///
+
public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) {
CubeSegment segment = ((CubeSegment) flatDesc.getSegment());
KylinConfig kylinConfig;
@@ -120,11 +128,6 @@ public class JoinedFlatTable {
+ ";\n";
}
- public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) {
- return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
- + ";\n";
- }
-
public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
return generateSelectDataStatement(flatDesc, false, null);
}
@@ -146,7 +149,7 @@ public class JoinedFlatTable {
if (skipAsList.contains(colTotalName)) {
sql.append(col.getExpressionInSourceDB() + sep);
} else {
- sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
+ sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep);
}
}
appendJoinStatement(flatDesc, sql, singleLine);
@@ -154,15 +157,6 @@ public class JoinedFlatTable {
return sql.toString();
}
- public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
- final StringBuilder sql = new StringBuilder();
- final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable();
- sql.append("dfs -mkdir -p " + outputDir + ";\n");
- sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity()
- + " " + rootTbl.getAlias() + "\n");
- appendWhereStatement(flatDesc, sql);
- return sql.toString();
- }
public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
final String sep = singleLine ? " " : "\n";
@@ -175,7 +169,6 @@ public class JoinedFlatTable {
for (JoinTableDesc lookupDesc : model.getJoinTables()) {
JoinDesc join = lookupDesc.getJoin();
if (join != null && join.getType().equals("") == false) {
- String joinType = join.getType().toUpperCase();
TableRef dimTable = lookupDesc.getTableRef();
if (!dimTableCache.contains(dimTable)) {
TblColRef[] pk = join.getPrimaryKeyColumns();
@@ -183,6 +176,8 @@ public class JoinedFlatTable {
if (pk.length != fk.length) {
throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
}
+ String joinType = join.getType().toUpperCase();
+
sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
sql.append("ON ");
for (int i = 0; i < pk.length; i++) {
@@ -201,7 +196,7 @@ public class JoinedFlatTable {
private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
if (redistCol != null) {
- sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n");
+ sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
} else {
sql.append(" DISTRIBUTE BY RAND()").append(";\n");
}
@@ -243,8 +238,13 @@ public class JoinedFlatTable {
sql.append(whereBuilder.toString());
}
+
private static String colName(TblColRef col) {
- return col.getTableAlias() + "_" + col.getName();
+ return colName(col, true);
+ }
+
+ private static String colName(TblColRef col, boolean useAlias) {
+ return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName();
}
private static String getHiveDataType(String javaDataType) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b9a3651..42f0dbf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -34,6 +34,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
+ public static final String STEP_NAME_CREATE_HIVE_TABLE = "Create Hive Table";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 0589829..8f86a52 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc {
List<TblColRef> getAllColumns();
+ List<TblColRef> getFactColumns();
+
int getColumnIndex(TblColRef colRef);
SegmentRange getSegRange();
@@ -41,4 +43,6 @@ public interface IJoinedFlatTableDesc {
// optionally present
ISegment getSegment();
+ boolean useAlias();
+
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a96f4d5..0e791eb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput {
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
- step.setIntermediateTableIdentity(getIntermediateTableIdentity());
- step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir));
+ step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+ step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
jobFlow.addTask(step);
}
@@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput {
private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
StringBuffer output = new StringBuffer();
- final String hiveTable = this.getIntermediateTableIdentity();
- if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
- hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
- config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
- output.append("Hive table " + hiveTable + " is dropped. \n");
- rmdirOnHDFS(getExternalDataPath());
- output.append(
- "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final List<String> hiveTables = this.getIntermediateTables();
+ for (String hiveTable : hiveTables) {
+ if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+ hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+ hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
+
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ }
}
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ rmdirOnHDFS(getExternalDataPaths());
+ output.append(
+ "Path " + getExternalDataPaths() + " is deleted. \n");
+
return output.toString();
}
- private void rmdirOnHDFS(String path) throws IOException {
- Path externalDataPath = new Path(path);
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- if (fs.exists(externalDataPath)) {
- fs.delete(externalDataPath, true);
+ private void rmdirOnHDFS(List<String> paths) throws IOException {
+ for (String path : paths) {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ }
}
}
- public void setIntermediateTableIdentity(String tableIdentity) {
- setParam("oldHiveTable", tableIdentity);
+ public void setIntermediateTables(List<String> tableIdentity) {
+ setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
}
- private String getIntermediateTableIdentity() {
- return getParam("oldHiveTable");
+ private List<String> getIntermediateTables() {
+ List<String> intermediateTables = Lists.newArrayList();
+ String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+ for (String t : tables) {
+ intermediateTables.add(t);
+ }
+ return intermediateTables;
}
- public void setExternalDataPath(String externalDataPath) {
- setParam("externalDataPath", externalDataPath);
+ public void setExternalDataPaths(List<String> externalDataPaths) {
+ setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
}
- private String getExternalDataPath() {
- return getParam("externalDataPath");
+ private List<String> getExternalDataPaths() {
+ String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+ List<String> result = Lists.newArrayList();
+ for (String s : paths) {
+ result.add(s);
+ }
+ return result;
}
public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 2ef4cdf..55df7f0 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -66,5 +66,9 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-source-hive</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 223e303..f37bf50 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.kafka;
import java.io.IOException;
@@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
+import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -30,8 +30,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -41,16 +41,17 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveMRInput;
import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
import org.apache.kylin.source.kafka.job.MergeOffsetStep;
import org.slf4j.Logger;
@@ -58,18 +59,19 @@ import org.slf4j.LoggerFactory;
public class KafkaMRInput implements IMRInput {
- CubeSegment cubeSegment;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
+ private CubeSegment cubeSegment;
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
this.cubeSegment = (CubeSegment) flatDesc.getSegment();
- return new BatchCubingInputSide(cubeSegment);
+ return new BatchCubingInputSide(cubeSegment, flatDesc);
}
@Override
public IMRTableInputFormat getTableInputFormat(TableDesc table) {
- return new KafkaTableInputFormat(cubeSegment, null, null, null);
+ return new KafkaTableInputFormat(cubeSegment, null);
}
@Override
@@ -80,12 +82,11 @@ public class KafkaMRInput implements IMRInput {
public static class KafkaTableInputFormat implements IMRTableInputFormat {
private final CubeSegment cubeSegment;
private final JobEngineConfig conf;
- private final String delimiter;
+ private String delimiter = "\01";
- public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
+ public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) {
this.cubeSegment = cubeSegment;
this.conf = conf;
- this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
}
@Override
@@ -114,30 +115,132 @@ public class KafkaMRInput implements IMRInput {
final JobEngineConfig conf;
final CubeSegment seg;
- private String outputPath;
-
- public BatchCubingInputSide(CubeSegment seg) {
+ private CubeDesc cubeDesc ;
+ private KylinConfig config;
+ protected IJoinedFlatTableDesc flatDesc;
+ protected String hiveTableDatabase;
+ private List<String> intermediateTables = Lists.newArrayList();
+ private List<String> intermediatePaths = Lists.newArrayList();
+ private String cubeName;
+
+ public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.config = seg.getConfig();
+ this.flatDesc = flatDesc;
+ this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
this.seg = seg;
+ this.cubeDesc = seg.getCubeDesc();
+ this.cubeName = seg.getCubeInstance().getName();
}
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+
+ boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+ final String baseLocation = getJobWorkingDir(jobFlow);
+ if (onlyOneTable) {
+ // directly use flat table location
+ final String intermediateFactTable = flatDesc.getTableName();
+ final String tableLocation = baseLocation + "/" + intermediateFactTable;
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation));
+ intermediatePaths.add(tableLocation);
+ } else {
+ final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_"
+ + seg.getUuid().replaceAll("-", "_") + "_fact";
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
+ jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+ }
+ }
+ private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) {
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+
+ final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
+
+ @Override
+ public String getTableName() {
+ return mockFactTableName;
+ }
+
+ @Override
+ public DataModelDesc getDataModel() {
+ return cubeDesc.getModel();
+ }
+
+ @Override
+ public List<TblColRef> getAllColumns() {
+ return flatDesc.getFactColumns();
+ }
+
+ @Override
+ public List<TblColRef> getFactColumns() {
+ return null;
+ }
+
+ @Override
+ public int getColumnIndex(TblColRef colRef) {
+ return 0;
+ }
+
+ @Override
+ public SegmentRange getSegRange() {
+ return null;
+ }
+
+ @Override
+ public TblColRef getDistributedBy() {
+ return null;
+ }
+
+ @Override
+ public TblColRef getClusterBy() {
+ return null;
+ }
+
+ @Override
+ public ISegment getSegment() {
+ return null;
+ }
+
+ @Override
+ public boolean useAlias() {
+ return false;
+ }
+ };
+ final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+ final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation);
+
+
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+ String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+ insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " ");
+
+ CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setInitStatement(hiveInitStatements);
+ step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+ intermediateTables.add(flatDesc.getTableName());
+ intermediateTables.add(mockFactTableName);
+ intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+ intermediatePaths.add(baseLocation + "/" + mockFactTableName);
+ return step;
}
- private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
- MapReduceExecutable result = new MapReduceExecutable();
+ protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+ return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+ }
- IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
- outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
+ MapReduceExecutable result = new MapReduceExecutable();
result.setName("Save data from Kafka");
result.setMapReduceJobClass(KafkaFlatTableJob.class);
JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
StringBuilder cmd = new StringBuilder();
jobBuilderSupport.appendMapReduceParameters(cmd);
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
@@ -147,20 +250,17 @@ public class KafkaMRInput implements IMRInput {
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- GarbageCollectionStep step = new GarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
- step.setDataPath(outputPath);
+ HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+ step.setIntermediateTables(intermediateTables);
+ step.setExternalDataPaths(intermediatePaths);
jobFlow.addTask(step);
}
@Override
public IMRTableInputFormat getFlatTableInputFormat() {
- KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
- KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable());
- List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
- return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
+ return new KafkaTableInputFormat(seg, conf);
}
}
@@ -178,43 +278,11 @@ public class KafkaMRInput implements IMRInput {
final MergeOffsetStep result = new MergeOffsetStep();
result.setName("Merge offset step");
- CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
jobFlow.addTask(result);
}
}
- public static class GarbageCollectionStep extends AbstractExecutable {
- private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- rmdirOnHDFS(getDataPath());
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return ExecuteResult.createError(e);
- }
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
- }
-
- private void rmdirOnHDFS(String path) throws IOException {
- Path externalDataPath = new Path(path);
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- if (fs.exists(externalDataPath)) {
- fs.delete(externalDataPath, true);
- }
- }
-
- public void setDataPath(String externalDataPath) {
- setParam("dataPath", externalDataPath);
- }
-
- private String getDataPath() {
- return getParam("dataPath");
- }
-
- }
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0ab83c6..1d65b96 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
@@ -224,7 +225,9 @@ public class KafkaSource implements ISource {
public List<String> getRelatedKylinResources(TableDesc table) {
List<String> dependentResources = Lists.newArrayList();
dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
- dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+ if (table.getSourceType() == ISourceAware.ID_STREAMING) {
+ dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+ }
return dependentResources;
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 696c20c..c31132b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("margin")
private long margin;
+ @JsonProperty("splitRows")
+ private int splitRows=1000000;
+
//"configA=1;configB=2"
@JsonProperty("parserProperties")
private String parserProperties;
@@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity {
return sb.toString();
}
+
+ public int getSplitRows() {
+ return splitRows;
+ }
+
+ public void setSplitRows(int splitRows) {
+ this.splitRows = splitRows;
+ }
+
@Override
public KafkaConfig clone() {
try {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b71ca84..e106a0a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
+ public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows";
+
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
@@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+ job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
@@ -152,7 +155,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.setMapperClass(KafkaFlatTableMapper.class);
job.setInputFormatClass(KafkaInputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
+ job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(0);
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
index 9fe29ca..b452b12 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -38,14 +37,18 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.StreamingParser;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> {
+public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, BytesWritable, Text> {
- private NullWritable outKey = NullWritable.get();
+ private BytesWritable outKey = new BytesWritable();
+ private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class);
private Text outValue = new Text();
private KylinConfig config;
private CubeSegment cubeSegment;
@@ -60,15 +63,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl
config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
- this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
- KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
- KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
- List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns();
+ this.delimiter = "\01";//sequence file default delimiter
+ logger.info("Use delimiter: " + delimiter);
+ final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
+ final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
+
+ final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+ final List<TblColRef> allColumns = flatTableDesc.getFactColumns();
try {
- streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
+ streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns);
} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException(e);
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index c996c5f..c3ed47f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+ final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS));
final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
@@ -79,9 +80,18 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
}
- if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
- InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
- splits.add(split);
+ long new_start = startOffsetMap.get(partitionId);
+ long end = endOffsetMap.get(partitionId);
+ while (end > new_start) {
+ if ((end - new_start) <= spiltRows && (end > new_start)) {
+ InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+ splits.add(split);
+ break;
+ } else {
+ InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltRows);
+ splits.add(split);
+ new_start += spiltRows;
+ }
}
}
}
@@ -93,4 +103,4 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
return new KafkaInputRecordReader();
}
-}
+}
\ No newline at end of file
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java
new file mode 100644
index 0000000..9dfb641
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka;
+
+import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.google.common.collect.Maps;
+
+public class SpiltNumTest {
+
+ public static List<InputSplit> getSplits() {
+
+ final String brokers = "brokers";
+ final String inputTopic = "topic";
+ final Integer spiltsSetnum = 10;
+
+ final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+ final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+ startOffsetMap.put(0, Long.valueOf(0));
+ endOffsetMap.put(0, Long.valueOf(15));
+ startOffsetMap.put(1, Long.valueOf(4));
+ endOffsetMap.put(1, Long.valueOf(26));
+ startOffsetMap.put(2, Long.valueOf(15));
+ endOffsetMap.put(2, Long.valueOf(47));
+ startOffsetMap.put(3, Long.valueOf(39));
+ endOffsetMap.put(3, Long.valueOf(41));
+
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
+ for (int i = 0; i < 4; i++) {
+ int partitionId = i;
+ long new_start = startOffsetMap.get(partitionId);
+ long end = endOffsetMap.get(partitionId);
+ while (end > new_start) {
+ if ((end - new_start) <= spiltsSetnum && (end > new_start)) {
+ InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+ splits.add(split);
+ break;
+ } else {
+ InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum);
+ splits.add(split);
+ new_start += spiltsSetnum;
+ }
+ }
+ }
+ return splits;
+ }
+
+ @Test
+ public void testSpiltNum(){
+ int slen = 0;
+ List<InputSplit> splits = getSplits();
+ slen = splits.size();
+ Assert.assertEquals(slen, 10);
+ }
+
+ @Test
+ public void testSpilt(){
+ boolean flag = false;
+ boolean flag1 = false;
+ boolean flag2 = false;
+ boolean flag3 = false;
+ boolean flag4 = false;
+ boolean flag5 = false;
+ boolean flag6 = false;
+ boolean flag7 = false;
+ boolean flag8 = false;
+ boolean flag9 = false;
+ boolean flag10 = false;
+ boolean result = false;
+ List<InputSplit> splits = getSplits();
+ for(Object eachspilt : splits){
+ flag = eachspilt.toString().contains("brokers-topic-0-0-10");
+ if(flag){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag1 = eachspilt.toString().contains("brokers-topic-0-10-15");
+ if(flag1){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag2 = eachspilt.toString().contains("brokers-topic-1-4-14");
+ if(flag2){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag3 = eachspilt.toString().contains("brokers-topic-1-14-24");
+ if(flag3){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag4 = eachspilt.toString().contains("brokers-topic-1-24-26");
+ if(flag4){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag5 = eachspilt.toString().contains("brokers-topic-2-15-25");
+ if(flag5) {
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag6 = eachspilt.toString().contains("brokers-topic-2-25-35");
+ if(flag6){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag7 = eachspilt.toString().contains("brokers-topic-2-35-45");
+ if(flag7){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag8 = eachspilt.toString().contains("brokers-topic-2-45-47");
+ if(flag8){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag9 = eachspilt.toString().contains("brokers-topic-3-39-41");
+ if(flag9){
+ break;
+ }
+ }
+ for(Object eachspilt : splits){
+ flag10 = eachspilt.toString().contains("brokers-topic-0-4-47");
+ if(flag10){
+ break;
+ }
+ }
+ result = flag && flag1 && flag2 && flag3 && flag4 && flag5 && flag6 && flag7 && flag8 && flag9;
+ Assert.assertTrue(result);
+ Assert.assertNotEquals(flag10, true);
+ }
+}
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.