You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/09 08:11:34 UTC
kylin git commit: KYLIN-2170 refactor doMap/doReduce/doCleanup
Repository: kylin
Updated Branches:
refs/heads/yang21 3b07c26a0 -> 1e24228ea
KYLIN-2170 refactor doMap/doReduce/doCleanup
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1e24228e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1e24228e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1e24228e
Branch: refs/heads/yang21
Commit: 1e24228ea9e4a070923a854e6a50fbe561ccc378
Parents: 3b07c26
Author: Li Yang <li...@apache.org>
Authored: Wed Nov 9 16:11:24 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Nov 9 16:11:24 2016 +0800
----------------------------------------------------------------------
build/conf/kylin_job_conf_inmem.xml | 2 +-
.../org/apache/kylin/engine/mr/KylinMapper.java | 47 +++++++++++++-
.../apache/kylin/engine/mr/KylinReducer.java | 51 ++++++++++++++-
.../engine/mr/steps/BaseCuboidMapperBase.java | 2 +-
.../kylin/engine/mr/steps/CuboidReducer.java | 2 +-
.../mr/steps/FactDistinctColumnPartitioner.java | 1 +
.../mr/steps/FactDistinctColumnsCombiner.java | 2 +-
.../mr/steps/FactDistinctColumnsMapperBase.java | 2 +-
.../mr/steps/FactDistinctColumnsReducer.java | 42 ++++++------
.../mr/steps/FactDistinctHiveColumnsMapper.java | 38 +++++------
.../engine/mr/steps/HiveToBaseCuboidMapper.java | 2 +-
.../engine/mr/steps/InMemCuboidMapper.java | 28 ++++----
.../engine/mr/steps/InMemCuboidReducer.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 2 +-
.../kylin/engine/mr/steps/NDCuboidMapper.java | 2 +-
.../steps/RowKeyDistributionCheckerMapper.java | 16 ++---
.../steps/RowKeyDistributionCheckerReducer.java | 2 +-
.../cardinality/ColumnCardinalityMapper.java | 26 ++++----
.../cardinality/ColumnCardinalityReducer.java | 40 +++++-------
.../storage/hbase/steps/CubeHFileMapper.java | 2 +-
.../hbase/steps/RangeKeyDistributionMapper.java | 14 ++--
.../steps/RangeKeyDistributionReducer.java | 68 +++++++++-----------
.../storage/hbase/steps/RowValueDecoder.java | 4 --
23 files changed, 227 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/build/conf/kylin_job_conf_inmem.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_job_conf_inmem.xml b/build/conf/kylin_job_conf_inmem.xml
index d6799d5..7e6dc08 100644
--- a/build/conf/kylin_job_conf_inmem.xml
+++ b/build/conf/kylin_job_conf_inmem.xml
@@ -94,7 +94,7 @@
<property>
<name>mapreduce.map.java.opts</name>
- <value>-Xmx2700m</value>
+ <value>-Xmx2700m -XX:OnOutOfMemoryError="kill -9 %p"</value>
<description></description>
</property>
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index 29c6844..a527b3d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -18,6 +18,8 @@
package org.apache.kylin.engine.mr;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
@@ -25,11 +27,54 @@ import org.slf4j.LoggerFactory;
/**
*/
-public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class);
protected void bindCurrentConfiguration(Configuration conf) {
logger.info("The conf for current mapper will be " + System.identityHashCode(conf));
HadoopUtil.setCurrentConfiguration(conf);
}
+
+ @Override
+ final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ try {
+ doMap(key, value, context);
+ } catch (IOException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (InterruptedException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (RuntimeException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (Error ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ }
+ }
+
+ abstract protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+
+ @Override
+ final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ try {
+ doCleanup(context);
+ } catch (IOException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (InterruptedException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (RuntimeException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (Error ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ }
+ }
+
+ protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 846c849..2987032 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -18,13 +18,62 @@
package org.apache.kylin.engine.mr;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*/
-public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class);
+
protected void bindCurrentConfiguration(Configuration conf) {
HadoopUtil.setCurrentConfiguration(conf);
}
+
+ @Override
+ final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ try {
+ doReduce(key, values, context);
+ } catch (IOException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (InterruptedException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (RuntimeException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (Error ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ }
+ }
+
+ abstract protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+
+ @Override
+ final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ try {
+ doCleanup(context);
+ } catch (IOException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (InterruptedException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (RuntimeException ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ } catch (Error ex) { // KYLIN-2170
+ logger.error("", ex);
+ throw ex;
+ }
+ }
+
+ protected void doCleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 4f0d3fd..0649a0c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Lists;
/**
*/
-public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
public static final byte[] ONE = Bytes.toBytes("1");
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index d6e1d7e..9543f0a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -86,7 +86,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
}
@Override
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
aggs.reset();
for (Text value : values) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index a631cf4..7801563 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -26,6 +26,7 @@ import org.apache.kylin.common.util.BytesUtil;
/**
*/
public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
+ @SuppressWarnings("unused")
private Configuration conf;
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
index 1821828..2dda047 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
@@ -34,7 +34,7 @@ public class FactDistinctColumnsCombiner extends KylinReducer<Text, Text, Text,
}
@Override
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// for hll, each key only has one output, no need to do local combine;
// for normal col, values are empty text
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..cb30b94 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -41,7 +41,7 @@ import org.apache.kylin.metadata.model.TblColRef;
/**
*/
-public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
protected String cubeName;
protected CubeInstance cube;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index a9c5d4b..6f0bf4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -101,7 +101,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
}
@Override
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if (isStatistics == false) {
colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
@@ -161,29 +161,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- if (isStatistics == false) {
- if (!outputTouched || colValues.size() > 0) {
- outputDistinctValues(col, colValues, context);
- colValues.clear();
- }
- } else {
- //output the hll info;
- long grandTotal = 0;
- for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
- grandTotal += hll.getCountEstimate();
- }
- double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
-
- int mapperNumber = baseCuboidRowCountInMappers.size();
-
- writeMapperAndCuboidStatistics(context); // for human check
- CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
- cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (isStatistics == false) {
+ if (!outputTouched || colValues.size() > 0) {
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
}
- } catch (Throwable ex) {
- logger.error("", ex);
+ } else {
+ //output the hll info;
+ long grandTotal = 0;
+ for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+
+ int mapperNumber = baseCuboidRowCountInMappers.size();
+
+ writeMapperAndCuboidStatistics(context); // for human check
+ CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
+ cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 2154bc6..061fc80 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -106,7 +106,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
}
@Override
- public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
String[] row = flatTableInputFormat.parseMapperInput(record);
try {
for (int i = 0; i < factDictCols.size(); i++) {
@@ -157,27 +157,23 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- if (collectStatistics) {
- ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- // output each cuboid's hll to reducer, key is 0 - cuboidId
- HyperLogLogPlusCounter hll;
- for (int i = 0; i < cuboidIds.length; i++) {
- hll = allCuboidsHLL[i];
-
- keyBuffer.clear();
- keyBuffer.put(MARK_FOR_HLL); // one byte
- keyBuffer.putLong(cuboidIds[i]);
- outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
- hllBuf.clear();
- hll.writeRegisters(hllBuf);
- outputValue.set(hllBuf.array(), 0, hllBuf.position());
- context.write(outputKey, outputValue);
- }
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (collectStatistics) {
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ HyperLogLogPlusCounter hll;
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = allCuboidsHLL[i];
+
+ keyBuffer.clear();
+ keyBuffer.put(MARK_FOR_HLL); // one byte
+ keyBuffer.putLong(cuboidIds[i]);
+ outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ context.write(outputKey, outputValue);
}
- } catch (Throwable ex) {
- ex.printStackTrace();
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 83926cc..d9c5312 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -38,7 +38,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
}
@Override
- public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+ public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
counter++;
if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handled " + counter + " records!");
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index dac93cb..15bfd2e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -113,7 +113,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
}
@Override
- public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
// put each row to the queue
String[] row = flatTableInputFormat.parseMapperInput(record);
List<String> rowAsList = Arrays.asList(row);
@@ -130,25 +130,21 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
logger.info("Totally handled " + counter + " records!");
- try {
- while (!future.isDone()) {
- if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
- break;
- }
- }
-
- try {
- future.get();
- } catch (Exception e) {
- throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+ while (!future.isDone()) {
+ if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
+ break;
}
- queue.clear();
- } catch (Throwable ex) {
- logger.error("", ex);
}
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+ }
+ queue.clear();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index cfecf23..d0a7062 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -73,7 +73,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
}
@Override
- public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
aggs.reset();
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 55b8474..67c0f4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -169,7 +169,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
}
@Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
long cuboidID = rowKeySplitter.split(key.getBytes());
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index b566c2e..8107e52 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -120,7 +120,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
}
@Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
long cuboidId = rowKeySplitter.split(key.getBytes());
Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index 21e97a3..fca91a6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -63,7 +63,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
}
@Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
for (Text t : keyList) {
if (key.compareTo(t) < 0) {
Long v = resultMap.get(t);
@@ -76,15 +76,11 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- LongWritable outputValue = new LongWritable();
- for (Entry<Text, Long> kv : resultMap.entrySet()) {
- outputValue.set(kv.getValue());
- context.write(kv.getKey(), outputValue);
- }
- } catch (Throwable ex) {
- ex.printStackTrace();
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ LongWritable outputValue = new LongWritable();
+ for (Entry<Text, Long> kv : resultMap.entrySet()) {
+ outputValue.set(kv.getValue());
+ context.write(kv.getKey(), outputValue);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
index 332cba5..d203e8c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
@@ -38,7 +38,7 @@ public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWri
}
@Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long length = 0;
for (LongWritable v : values) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 8c624e3..06a07ca 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -66,7 +66,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
}
@Override
- public void map(T key, Object value, Context context) throws IOException, InterruptedException {
+ public void doMap(T key, Object value, Context context) throws IOException, InterruptedException {
ColumnDesc[] columns = tableDesc.getColumns();
String[] values = tableInputFormat.parseMapperInput(value);
@@ -95,20 +95,16 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- Iterator<Integer> it = hllcMap.keySet().iterator();
- ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
- }
- } catch (Throwable ex) {
- ex.printStackTrace();
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ Iterator<Integer> it = hllcMap.keySet().iterator();
+ ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ while (it.hasNext()) {
+ int key = it.next();
+ HyperLogLogPlusCounter hllc = hllcMap.get(key);
+ buf.clear();
+ hllc.writeRegisters(buf);
+ buf.flip();
+ context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 2551af3..ea66999 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -49,7 +49,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
}
@Override
- public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
int skey = key.get();
for (BytesWritable v : values) {
ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
@@ -68,28 +68,22 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- List<Integer> keys = new ArrayList<Integer>();
- Iterator<Integer> it = hllcMap.keySet().iterator();
- while (it.hasNext()) {
- keys.add(it.next());
- }
- Collections.sort(keys);
- it = keys.iterator();
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
- // context.write(new Text("ErrorRate_" + key), new
- // LongWritable((long)hllc.getErrorRate()));
- }
- } catch (Throwable ex) {
- ex.printStackTrace();
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ List<Integer> keys = new ArrayList<Integer>();
+ Iterator<Integer> it = hllcMap.keySet().iterator();
+ while (it.hasNext()) {
+ keys.add(it.next());
+ }
+ Collections.sort(keys);
+ it = keys.iterator();
+ while (it.hasNext()) {
+ int key = it.next();
+ HyperLogLogPlusCounter hllc = hllcMap.get(key);
+ ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ buf.clear();
+ hllc.writeRegisters(buf);
+ buf.flip();
+ context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index 8205ff7..371a83b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -74,7 +74,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
}
@Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
outputKey.set(key.getBytes(), 0, key.getLength());
KeyValue outputValue;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
index c82d58d..c66ccb3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
@@ -44,7 +44,7 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
}
@Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
lastKey = key;
int bytesLength = key.getLength() + value.getLength();
@@ -61,14 +61,10 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- }
- } catch (Throwable ex) {
- ex.printStackTrace();
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (lastKey != null) {
+ outputValue.set(bytesRead);
+ context.write(lastKey, outputValue);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
index e9918d4..63433dd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -84,7 +84,7 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
}
@Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable v : values) {
bytesRead += v.get();
}
@@ -96,42 +96,38 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- int nRegion = Math.round((float) gbPoints.size() / cut);
- nRegion = Math.max(minRegionCount, nRegion);
- nRegion = Math.min(maxRegionCount, nRegion);
-
- int gbPerRegion = gbPoints.size() / nRegion;
- gbPerRegion = Math.max(1, gbPerRegion);
-
- if (hfileSizeGB <= 0) {
- hfileSizeGB = gbPerRegion;
- }
- int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
- hfilePerRegion = Math.max(1, hfilePerRegion);
-
- System.out.println(nRegion + " regions");
- System.out.println(gbPerRegion + " GB per region");
- System.out.println(hfilePerRegion + " hfile per region");
-
- Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
- SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class);
- int hfileCountInOneRegion = 0;
- for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
- hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
- if (++hfileCountInOneRegion >= hfilePerRegion) {
- Text key = gbPoints.get(i);
- outputValue.set(i);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- context.write(key, outputValue);
-
- hfileCountInOneRegion = 0;
- }
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ int nRegion = Math.round((float) gbPoints.size() / cut);
+ nRegion = Math.max(minRegionCount, nRegion);
+ nRegion = Math.min(maxRegionCount, nRegion);
+
+ int gbPerRegion = gbPoints.size() / nRegion;
+ gbPerRegion = Math.max(1, gbPerRegion);
+
+ if (hfileSizeGB <= 0) {
+ hfileSizeGB = gbPerRegion;
+ }
+ int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
+ hfilePerRegion = Math.max(1, hfilePerRegion);
+
+ System.out.println(nRegion + " regions");
+ System.out.println(gbPerRegion + " GB per region");
+ System.out.println(hfilePerRegion + " hfile per region");
+
+ Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+ SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class);
+ int hfileCountInOneRegion = 0;
+ for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+ hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+ if (++hfileCountInOneRegion >= hfilePerRegion) {
+ Text key = gbPoints.get(i);
+ outputValue.set(i);
+ System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+ context.write(key, outputValue);
+
+ hfileCountInOneRegion = 0;
}
- hfilePartitionWriter.close();
- } catch (Throwable ex) {
- logger.error("", ex);
}
+ hfilePartitionWriter.close();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 86104e2..b02183a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -31,15 +31,11 @@ import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.storage.hbase.util.Results;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*/
public class RowValueDecoder implements Cloneable {
- private static final Logger logger = LoggerFactory.getLogger(RowValueDecoder.class);
-
private final HBaseColumnDesc hbaseColumn;
private final byte[] hbaseColumnFamily;
private final byte[] hbaseColumnQualifier;