You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 16:15:47 UTC

[09/18] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns reducer number

KYLIN-2135 Enlarge FactDistinctColumns reducer number

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/74214030
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/74214030
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/74214030

Branch: refs/heads/KYLIN-2006
Commit: 74214030272ffef275ccf0359b583b3278aec468
Parents: 47de961
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 26 19:35:20 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  5 ++
 .../java/org/apache/kylin/cube/CubeManager.java | 35 ++++++++++++
 .../kylin/engine/mr/DFSFileTableReader.java     | 59 ++++++++++++++++----
 .../kylin/engine/mr/common/BatchConstants.java  |  5 ++
 .../mr/steps/FactDistinctColumnPartitioner.java | 11 +---
 .../engine/mr/steps/FactDistinctColumnsJob.java | 18 +++++-
 .../mr/steps/FactDistinctColumnsMapperBase.java | 17 +++++-
 .../mr/steps/FactDistinctColumnsReducer.java    | 34 ++++++++++-
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++-
 9 files changed, 170 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d9d10bb..6d3e807 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
     }
 
+    //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
+    public int getUHCReducerCount() {
+        return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3"));
+    }
+
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("hive.table.location." + table.toUpperCase());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 87bb93d..9893040 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
@@ -1049,4 +1051,37 @@ public class CubeManager implements IRealizationProvider {
         }
         return holes;
     }
+
+    private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+        List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+        int[] uhcIndex = new int[factDictCols.size()];
+
+        //add GlobalDictionaryColumns
+        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+        if (dictionaryDescList != null) {
+            for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+                if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+                    for (int i = 0; i < factDictCols.size(); i++) {
+                        if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+                            uhcIndex[i] = 1;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        //add ShardByColumns
+        Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+        for (int i = 0; i < factDictCols.size(); i++) {
+            if (shardByColumns.contains(factDictCols.get(i))) {
+                uhcIndex[i] = 1;
+            }
+        }
+
+        return uhcIndex;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 300b123..dda1d6f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,10 +23,14 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader {
 
     private String filePath;
     private String delim;
-    private RowReader reader;
+    private List<RowReader> readerList;
 
     private String curLine;
     private String[] curColumns;
@@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader {
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;
+        this.readerList = new ArrayList<RowReader>();
 
         FileSystem fs = HadoopUtil.getFileSystem(filePath);
 
-        try {
-            this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        try {
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         } catch (IOException e) {
             if (isExceptionSayingNotSeqFile(e) == false)
                 throw e;
 
-            this.reader = new CsvRowReader(fs, filePath);
+            this.readerList = new ArrayList<RowReader>();
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         }
     }
 
@@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader {
 
     @Override
     public boolean next() throws IOException {
-        curLine = reader.nextLine();
-        curColumns = null;
-        return curLine != null;
+        int curReaderIndex = -1;
+        RowReader curReader;
+
+        while (++curReaderIndex < readerList.size()) {
+            curReader = readerList.get(curReaderIndex);
+            curLine = curReader.nextLine();
+            curColumns = null;
+
+            if (curLine != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public String getLine() {
@@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader {
     }
 
     @Override
-    public void close() throws IOException {
-        if (reader != null)
-            reader.close();
+    public void close() {
+        for (RowReader reader : readerList) {
+            try {
+                if (reader != null)
+                    reader.close();
+            } catch (IOException e) {
+                logger.warn("close file failed:", e);
+            }
+        }
     }
 
     private String autoDetectDelim(String line) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index e4a8808..078d80f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -81,4 +81,9 @@ public interface BatchConstants {
     String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
     int NORMAL_RECORD_LOG_THRESHOLD = 100000;
     int ERROR_RECORD_LOG_THRESHOLD = 100;
+
+    /**
+     * dictionaries builder class
+     */
+    String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 6973c4b..b36e422 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.kylin.common.util.BytesUtil;
@@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil;
 /**
  */
 public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
-    private Configuration conf;
-
     @Override
     public int getPartition(Text key, Text value, int numReduceTasks) {
-
         if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
             // the last reducer is for merging hll
             return numReduceTasks - 1;
         } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
-            // the last reducer is for merging hll
+            // the last but one reducer is for partition col
             return numReduceTasks - 2;
         } else {
-            int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
-            return colIndex;
+            return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 28ee335..92da7d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
+            int reducerCount = columnsNeedDict.size();
+            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+            for(int index : uhcIndex) {
+                if(index == 1) {
+                    reducerCount += uhcReducerCount - 1;
+                }
+            }
+
+            if (reducerCount > 255) {
+                throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
+            }
+
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -98,7 +114,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 logger.info("Found segment: " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..196bf1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected int[] dictionaryColumnIndex;
 
+    protected int uhcReducerCount;
+    protected int[] uhcIndex;
+    protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>();
+
     @Override
     protected void setup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
@@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
-        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),  cubeDesc);
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
         dictionaryColumnIndex = new int[factDictCols.size()];
         for (int i = 0; i < factDictCols.size(); i++) {
             TblColRef colRef = factDictCols.get(i);
@@ -81,6 +87,15 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
             dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
         }
 
+        uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+        uhcReducerCount = cube.getConfig().getUHCReducerCount();
+        int count = 0;
+        for (int i = 0; i < uhcIndex.length; i++) {
+            columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i);
+            if (uhcIndex[i] == 1) {
+                count++;
+            }
+        }
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index ecbc6c2..5b00381 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -67,6 +68,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     private boolean isStatistics = false;
     private boolean isPartitionCol = false;
     private KylinConfig cubeConfig;
+    private int uhcReducerCount;
+    private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
+    private int taskId;
+
     protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
     @Override
@@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
 
         boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
         int numberOfTasks = context.getNumReduceTasks();
-        int taskId = context.getTaskAttemptID().getTaskID().getId();
+        taskId = context.getTaskAttemptID().getTaskID().getId();
+
+        uhcReducerCount = cube.getConfig().getUHCReducerCount();
+        initReducerIdToColumnIndex(config);
 
         if (collectStatistics && (taskId == numberOfTasks - 1)) {
             // hll
@@ -102,11 +110,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
             // col
             isStatistics = false;
             isPartitionCol = false;
-            col = columnList.get(taskId);
+            col = columnList.get(ReducerIdToColumnIndex.get(taskId));
             colValues = Lists.newLinkedList();
         }
     }
 
+    private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
+        int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+        int count = 0;
+        for (int i = 0; i < uhcIndex.length; i++) {
+            ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
+            if (uhcIndex[i] == 1) {
+                for (int j = 1; j < uhcReducerCount; j++) {
+                    ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
+                }
+                count++;
+            }
+        }
+    }
+
     @Override
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
@@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = FileSystem.get(conf);
         final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, col.getName());
+        final Path colDir = new Path(outputPath, col.getName());
+        final String fileName = col.getName() + "-" + taskId % uhcReducerCount;
+        final Path outputFile = new Path(colDir, fileName);
 
         FSDataOutputStream out = null;
         try {
+            if (!fs.exists(colDir)) {
+                fs.mkdirs(colDir);
+            }
+
             if (fs.exists(outputFile)) {
                 out = fs.append(outputFile);
                 logger.info("append file " + outputFile);

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 177c9f6..7a183b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -141,7 +141,17 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 if (fieldValue == null)
                     continue;
                 int offset = keyBuffer.position();
-                keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+
+                int reducerIndex;
+                if (uhcIndex[i] == 0) {
+                    //for the normal dictionary column
+                    reducerIndex = columnIndexToReducerBeginId.get(i);
+                } else {
+                    //for the uhc
+                    reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+                }
+
+                keyBuffer.put(Bytes.toBytes(reducerIndex)[3]);
                 keyBuffer.put(Bytes.toBytes(fieldValue));
                 outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
                 context.write(outputKey, EMPTY_TEXT);