You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 16:15:47 UTC
[09/18] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns
reducer number
KYLIN-2135 Enlarge FactDistinctColumns reducer number
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/74214030
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/74214030
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/74214030
Branch: refs/heads/KYLIN-2006
Commit: 74214030272ffef275ccf0359b583b3278aec468
Parents: 47de961
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 26 19:35:20 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 5 ++
.../java/org/apache/kylin/cube/CubeManager.java | 35 ++++++++++++
.../kylin/engine/mr/DFSFileTableReader.java | 59 ++++++++++++++++----
.../kylin/engine/mr/common/BatchConstants.java | 5 ++
.../mr/steps/FactDistinctColumnPartitioner.java | 11 +---
.../engine/mr/steps/FactDistinctColumnsJob.java | 18 +++++-
.../mr/steps/FactDistinctColumnsMapperBase.java | 17 +++++-
.../mr/steps/FactDistinctColumnsReducer.java | 34 ++++++++++-
.../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++-
9 files changed, 170 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d9d10bb..6d3e807 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
}
+ //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
+ public int getUHCReducerCount() {
+ return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3"));
+ }
+
public String getOverrideHiveTableLocation(String table) {
return getOptional("hive.table.location." + table.toUpperCase());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 87bb93d..9893040 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
@@ -1049,4 +1051,37 @@ public class CubeManager implements IRealizationProvider {
}
return holes;
}
+
+ private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+ //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+ public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+ List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+ int[] uhcIndex = new int[factDictCols.size()];
+
+ //add GlobalDictionaryColumns
+ List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+ if (dictionaryDescList != null) {
+ for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+ if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+ for (int i = 0; i < factDictCols.size(); i++) {
+ if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+ uhcIndex[i] = 1;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ //add ShardByColumns
+ Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+ for (int i = 0; i < factDictCols.size(); i++) {
+ if (shardByColumns.contains(factDictCols.get(i))) {
+ uhcIndex[i] = 1;
+ }
+ }
+
+ return uhcIndex;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 300b123..dda1d6f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,10 +23,14 @@ import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader {
private String filePath;
private String delim;
- private RowReader reader;
+ private List<RowReader> readerList;
private String curLine;
private String[] curColumns;
@@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader {
this.filePath = filePath;
this.delim = delim;
this.expectedColumnNumber = expectedColumnNumber;
+ this.readerList = new ArrayList<RowReader>();
FileSystem fs = HadoopUtil.getFileSystem(filePath);
- try {
- this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
+ ArrayList<FileStatus> allFiles = new ArrayList<>();
+ FileStatus status = fs.getFileStatus(new Path(filePath));
+ if (status.isFile()) {
+ allFiles.add(status);
+ } else {
+ FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+ allFiles.addAll(Arrays.asList(listStatus));
+ }
+
+ try {
+ for (FileStatus f : allFiles) {
+ RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+ this.readerList.add(rowReader);
+ }
} catch (IOException e) {
if (isExceptionSayingNotSeqFile(e) == false)
throw e;
- this.reader = new CsvRowReader(fs, filePath);
+ this.readerList = new ArrayList<RowReader>();
+ for (FileStatus f : allFiles) {
+ RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+ this.readerList.add(rowReader);
+ }
}
}
@@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader {
@Override
public boolean next() throws IOException {
- curLine = reader.nextLine();
- curColumns = null;
- return curLine != null;
+ int curReaderIndex = -1;
+ RowReader curReader;
+
+ while (++curReaderIndex < readerList.size()) {
+ curReader = readerList.get(curReaderIndex);
+ curLine = curReader.nextLine();
+ curColumns = null;
+
+ if (curLine != null) {
+ return true;
+ }
+ }
+
+ return false;
}
public String getLine() {
@@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader {
}
@Override
- public void close() throws IOException {
- if (reader != null)
- reader.close();
+ public void close() {
+ for (RowReader reader : readerList) {
+ try {
+ if (reader != null)
+ reader.close();
+ } catch (IOException e) {
+ logger.warn("close file failed:", e);
+ }
+ }
}
private String autoDetectDelim(String line) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index e4a8808..078d80f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -81,4 +81,9 @@ public interface BatchConstants {
String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
int NORMAL_RECORD_LOG_THRESHOLD = 100000;
int ERROR_RECORD_LOG_THRESHOLD = 100;
+
+ /**
+ * dictionaries builder class
+ */
+ String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 6973c4b..b36e422 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -18,7 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.kylin.common.util.BytesUtil;
@@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil;
/**
*/
public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
- private Configuration conf;
-
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
-
if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
// the last reducer is for merging hll
return numReduceTasks - 1;
} else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
- // the last reducer is for merging hll
+ // the last but one reducer is for partition col
return numReduceTasks - 2;
} else {
- int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
- return colIndex;
+ return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 28ee335..92da7d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
+ int reducerCount = columnsNeedDict.size();
+ int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+ int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+ for(int index : uhcIndex) {
+ if(index == 1) {
+ reducerCount += uhcReducerCount - 1;
+ }
+ }
+
+ if (reducerCount > 255) {
+ throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
+ }
+
+
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
@@ -98,7 +114,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
logger.info("Found segment: " + segment);
}
setupMapper(cube.getSegmentById(segmentID));
- setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+ setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
attachKylinPropsAndMetadata(cube, job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..196bf1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected CubeJoinedFlatTableEnrich intermediateTableDesc;
protected int[] dictionaryColumnIndex;
+ protected int uhcReducerCount;
+ protected int[] uhcIndex;
+ protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>();
+
@Override
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
@@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
- intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+ intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
dictionaryColumnIndex = new int[factDictCols.size()];
for (int i = 0; i < factDictCols.size(); i++) {
TblColRef colRef = factDictCols.get(i);
@@ -81,6 +87,15 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
}
+ uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+ uhcReducerCount = cube.getConfig().getUHCReducerCount();
+ int count = 0;
+ for (int i = 0; i < uhcIndex.length; i++) {
+ columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i);
+ if (uhcIndex[i] == 1) {
+ count++;
+ }
+ }
}
protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index ecbc6c2..5b00381 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
private boolean isStatistics = false;
private boolean isPartitionCol = false;
private KylinConfig cubeConfig;
+ private int uhcReducerCount;
+ private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
+ private int taskId;
+
protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
@Override
@@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
int numberOfTasks = context.getNumReduceTasks();
- int taskId = context.getTaskAttemptID().getTaskID().getId();
+ taskId = context.getTaskAttemptID().getTaskID().getId();
+
+ uhcReducerCount = cube.getConfig().getUHCReducerCount();
+ initReducerIdToColumnIndex(config);
if (collectStatistics && (taskId == numberOfTasks - 1)) {
// hll
@@ -102,11 +110,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
// col
isStatistics = false;
isPartitionCol = false;
- col = columnList.get(taskId);
+ col = columnList.get(ReducerIdToColumnIndex.get(taskId));
colValues = Lists.newLinkedList();
}
}
+ private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
+ int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+ int count = 0;
+ for (int i = 0; i < uhcIndex.length; i++) {
+ ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
+ if (uhcIndex[i] == 1) {
+ for (int j = 1; j < uhcReducerCount; j++) {
+ ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
+ }
+ count++;
+ }
+ }
+ }
+
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
@@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
final Configuration conf = context.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
- final Path outputFile = new Path(outputPath, col.getName());
+ final Path colDir = new Path(outputPath, col.getName());
+ final String fileName = col.getName() + "-" + taskId % uhcReducerCount;
+ final Path outputFile = new Path(colDir, fileName);
FSDataOutputStream out = null;
try {
+ if (!fs.exists(colDir)) {
+ fs.mkdirs(colDir);
+ }
+
if (fs.exists(outputFile)) {
out = fs.append(outputFile);
logger.info("append file " + outputFile);
http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 177c9f6..7a183b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -141,7 +141,17 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
if (fieldValue == null)
continue;
int offset = keyBuffer.position();
- keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+
+ int reducerIndex;
+ if (uhcIndex[i] == 0) {
+ //for the normal dictionary column
+ reducerIndex = columnIndexToReducerBeginId.get(i);
+ } else {
+ //for the uhc
+ reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+ }
+
+ keyBuffer.put(Bytes.toBytes(reducerIndex)[3]);
keyBuffer.put(Bytes.toBytes(fieldValue));
outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
context.write(outputKey, EMPTY_TEXT);