You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/12/01 10:02:02 UTC
kylin git commit: refine mapper and reducer log
Repository: kylin
Updated Branches:
refs/heads/yang21 80018874c -> d3ecb0d9c
refine mapper and reducer log
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3ecb0d9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3ecb0d9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3ecb0d9
Branch: refs/heads/yang21
Commit: d3ecb0d9c381dbb035c7cada7d3c798e24fef1d1
Parents: 8001887
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Dec 1 18:01:55 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Dec 1 18:01:55 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/KylinMapper.java | 17 +++++++---
.../apache/kylin/engine/mr/KylinReducer.java | 17 +++++++---
.../engine/mr/steps/BaseCuboidMapperBase.java | 1 -
.../kylin/engine/mr/steps/CuboidReducer.java | 21 ++++++------
.../engine/mr/steps/HiveToBaseCuboidMapper.java | 10 ++----
.../engine/mr/steps/InMemCuboidMapper.java | 34 ++++++++------------
.../engine/mr/steps/InMemCuboidReducer.java | 20 ++++++------
.../kylin/engine/mr/steps/NDCuboidMapper.java | 22 ++++++-------
8 files changed, 70 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index a01f7a2..c5af2fe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -18,18 +18,21 @@
package org.apache.kylin.engine.mr;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
*/
public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class);
+ protected int mapCounter = 0;
+
protected void bindCurrentConfiguration(Configuration conf) {
logger.info("The conf for current mapper will be " + System.identityHashCode(conf));
HadoopUtil.setCurrentConfiguration(conf);
@@ -38,6 +41,10 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN,
@Override
final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
+ if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Accepting Mapper Key with ordinal: " + mapCounter);
+ }
+
doMap(key, value, context);
} catch (IOException ex) { // KYLIN-2170
logger.error("", ex);
@@ -53,11 +60,11 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN,
throw ex;
}
}
-
+
protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
super.map(key, value, context);
}
-
+
@Override
final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
@@ -76,7 +83,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN,
throw ex;
}
}
-
+
protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 2b63ce0..83266ea 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -18,18 +18,22 @@
package org.apache.kylin.engine.mr;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
*/
public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class);
-
+
+ protected int reduceCounter = 0;
+
+
protected void bindCurrentConfiguration(Configuration conf) {
HadoopUtil.setCurrentConfiguration(conf);
}
@@ -37,6 +41,9 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI
@Override
final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
+ if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Accepting Reducer Key with ordinal: " + reduceCounter);
+ }
doReduce(key, values, context);
} catch (IOException ex) { // KYLIN-2170
logger.error("", ex);
@@ -52,11 +59,11 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI
throw ex;
}
}
-
+
protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
}
-
+
@Override
final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index dd0a031..2ad5f53 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -69,7 +69,6 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected CubeJoinedFlatTableEnrich intermediateTableDesc;
protected String intermediateTableRowDelimiter;
protected byte byteRowDelimiter;
- protected int counter;
protected MeasureIngester<?>[] aggrIngesters;
protected Map<TblColRef, Dictionary<String>> dictionaryMap;
protected Object[] measures;
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 9543f0a..03c925e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -18,10 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeManager;
@@ -35,9 +31,12 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
/**
* @author George Song (ysong1)
- *
*/
public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
@@ -50,7 +49,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
private BufferedMeasureCodec codec;
private MeasureAggregators aggs;
- private int counter;
+ private int vcounter = 0;
private int cuboidLevel;
private boolean[] needAggr;
private Object[] input;
@@ -90,12 +89,18 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
aggs.reset();
for (Text value : values) {
+
+ if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handling value with ordinal: " + vcounter + "!");
+ }
+
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
if (cuboidLevel > 0) {
aggs.aggregate(input, needAggr);
} else {
aggs.aggregate(input);
}
+
}
aggs.collectStates(result);
@@ -104,10 +109,6 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(key, outputValue);
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index d9c5312..f4e8af7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -18,11 +18,10 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import java.io.IOException;
/**
* @author George Song (ysong1)
@@ -39,11 +38,6 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
@Override
public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
try {
//put a record into the shared bytesSplitter
String[] row = flatTableInputFormat.parseMapperInput(value);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 15bfd2e..cf5abaf 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -18,18 +18,7 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
@@ -51,7 +40,17 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -64,7 +63,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
private CubeSegment cubeSegment;
private IMRTableInputFormat flatTableInputFormat;
- private int counter;
private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
private Future<?> future;
@@ -120,10 +118,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
while (!future.isDone()) {
if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
break;
}
}
@@ -131,10 +125,10 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
@Override
protected void doCleanup(Context context) throws IOException, InterruptedException {
- logger.info("Totally handled " + counter + " records!");
+ logger.info("Totally handled " + mapCounter + " records!");
while (!future.isDone()) {
- if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
+ if (queue.offer(Collections.<String>emptyList(), 1, TimeUnit.SECONDS)) {
break;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index d0a7062..a57ddb8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -18,10 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
@@ -37,6 +33,10 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
/**
*/
public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
@@ -46,7 +46,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
private BufferedMeasureCodec codec;
private MeasureAggregators aggs;
- private int counter;
+ private int vcounter;
private Object[] input;
private Object[] result;
@@ -74,10 +74,14 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
@Override
public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
-
aggs.reset();
for (ByteArrayWritable value : values) {
+
+ if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handling value with ordinal: " + vcounter);
+ }
+
codec.decode(value.asBuffer(), input);
aggs.aggregate(input);
}
@@ -92,10 +96,6 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
context.write(outputKey, outputValue);
- counter++;
- if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + counter + " records!");
- }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 8107e52..54d9e23 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -18,9 +18,6 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.util.Collection;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
@@ -41,9 +38,11 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+
/**
* @author George Song (ysong1)
- *
*/
public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
@@ -97,9 +96,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
for (int i = 0; i < parentCuboidIdActualLength; i++) {
if ((mask & parentCuboidId) > 0) {// if the this bit position equals
- // 1
+ // 1
if ((mask & childCuboidId) > 0) {// if the child cuboid has this
- // column
+ // column
System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
offset += splitBuffers[index].length;
}
@@ -123,24 +122,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
long cuboidId = rowKeySplitter.split(key.getBytes());
Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
-
Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
// if still empty or null
if (myChildren == null || myChildren.size() == 0) {
context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
- skipCounter++;
- if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Skipped " + skipCounter + " records!");
+ if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Skipping record with ordinal " + skipCounter);
}
return;
}
context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
- handleCounter++;
- if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handled " + handleCounter + " records!");
+ if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+ logger.info("Handling record with ordinal: " + handleCounter);
}
for (Long child : myChildren) {