You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/07 04:29:35 UTC
[18/50] [abbrv] kylin git commit: refine mapper and reducer log
refine mapper and reducer log
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/58224921
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/58224921
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/58224921
Branch: refs/heads/master-hbase1.x
Commit: 58224921d896e4479f5d034d43c044aacaf14200
Parents: 28ba1ea
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Dec 1 18:15:46 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Dec 1 18:15:46 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/engine/mr/KylinMapper.java | 12 +++++++++---
.../java/org/apache/kylin/engine/mr/KylinReducer.java | 13 ++++++++++---
.../apache/kylin/engine/mr/steps/CuboidReducer.java | 11 ++++-------
.../kylin/engine/mr/steps/HiveToBaseCuboidMapper.java | 6 ------
.../kylin/engine/mr/steps/InMemCuboidMapper.java | 7 +------
.../kylin/engine/mr/steps/InMemCuboidReducer.java | 10 +++++-----
.../apache/kylin/engine/mr/steps/NDCuboidMapper.java | 10 ++++------
7 files changed, 33 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index a01f7a2..2b564e9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory;
public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class);
+ protected int mapCounter = 0;
+
protected void bindCurrentConfiguration(Configuration conf) {
logger.info("The conf for current mapper will be " + System.identityHashCode(conf));
HadoopUtil.setCurrentConfiguration(conf);
@@ -38,6 +41,9 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN,
@Override
final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
+ if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Accepting Mapper Key with ordinal: " + mapCounter);
+ }
doMap(key, value, context);
} catch (IOException ex) { // KYLIN-2170
logger.error("", ex);
@@ -53,11 +59,11 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN,
throw ex;
}
}
-
+
protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
super.map(key, value, context);
}
-
+
@Override
final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
@@ -76,7 +82,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN,
throw ex;
}
}
-
+
protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 2b63ce0..cb2d7a7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,7 +30,9 @@ import org.slf4j.LoggerFactory;
*/
public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class);
-
+
+ protected int reduceCounter = 0;
+
protected void bindCurrentConfiguration(Configuration conf) {
HadoopUtil.setCurrentConfiguration(conf);
}
@@ -37,6 +40,10 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI
@Override
final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
+ if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Accepting Mapper Key with ordinal: " + reduceCounter);
+ }
+
doReduce(key, values, context);
} catch (IOException ex) { // KYLIN-2170
logger.error("", ex);
@@ -52,11 +59,11 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI
throw ex;
}
}
-
+
protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
}
-
+
@Override
final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 9543f0a..b1d4aaa 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -50,11 +50,11 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
private BufferedMeasureCodec codec;
private MeasureAggregators aggs;
- private int counter;
private int cuboidLevel;
private boolean[] needAggr;
private Object[] input;
private Object[] result;
+ private int vcounter;
private Text outputValue = new Text();
@@ -90,6 +90,9 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
aggs.reset();
for (Text value : values) {
+ if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handling value with ordinal: " + vcounter);
+ }
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
if (cuboidLevel > 0) {
aggs.aggregate(input, needAggr);
@@ -103,11 +106,5 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(key, outputValue);
-
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 9fa20ae..428f878 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
/**
* @author George Song (ysong1)
@@ -39,11 +38,6 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
@Override
public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
String[] row = flatTableInputFormat.parseMapperInput(value);
try {
outputKV(row, context);
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 15bfd2e..116d5e0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -64,7 +64,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
private CubeSegment cubeSegment;
private IMRTableInputFormat flatTableInputFormat;
- private int counter;
private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
private Future<?> future;
@@ -120,10 +119,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
while (!future.isDone()) {
if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
break;
}
}
@@ -131,7 +126,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
@Override
protected void doCleanup(Context context) throws IOException, InterruptedException {
- logger.info("Totally handled " + counter + " records!");
+ logger.info("Totally handled " + mapCounter + " records!");
while (!future.isDone()) {
if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index d0a7062..04c9e90 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -46,10 +46,11 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
private BufferedMeasureCodec codec;
private MeasureAggregators aggs;
- private int counter;
private Object[] input;
private Object[] result;
+ private int vcounter;
+
private Text outputKey;
private Text outputValue;
@@ -78,6 +79,9 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
aggs.reset();
for (ByteArrayWritable value : values) {
+ if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handling value with ordinal: " + vcounter);
+ }
codec.decode(value.asBuffer(), input);
aggs.aggregate(input);
}
@@ -92,10 +96,6 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
context.write(outputKey, outputValue);
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 8107e52..01cdd4a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -129,18 +129,16 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
// if still empty or null
if (myChildren == null || myChildren.size() == 0) {
context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
- skipCounter++;
- if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Skipped " + skipCounter + " records!");
+ if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Skipping record with ordinal: " + skipCounter);
}
return;
}
context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
- handleCounter++;
- if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + handleCounter + " records!");
+ if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handling record with ordinal: " + handleCounter);
}
for (Long child : myChildren) {