You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/02/02 01:44:21 UTC

tez git commit: TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 870972d2d -> 72f561639


TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/72f56163
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/72f56163
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/72f56163

Branch: refs/heads/master
Commit: 72f561639f828f2d8a815d52460e44fe2ea56d3a
Parents: 870972d
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Feb 1 16:44:11 2016 -0800
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Feb 1 16:44:11 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/tools/TFileLoader.java  | 91 ++++++--------------
 .../org/apache/tez/tools/TFileRecordReader.java | 68 +++++++++++----
 3 files changed, 80 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a550015..6bff146 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles.
   TEZ-3081. Update tez website for trademarks feedback.
   TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
   TEZ-3079. Fix tez-tfile parser documentation.

http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
----------------------------------------------------------------------
diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
index 7fbcbf6..18e9940 100644
--- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
+++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
@@ -19,16 +19,11 @@
 package org.apache.tez.tools;
 
 import com.google.common.base.Objects;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.pig.Expression;
@@ -36,17 +31,15 @@ import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.StringReader;
 import java.util.regex.Pattern;
 
 /**
@@ -57,74 +50,46 @@ public class TFileLoader extends FileInputLoadFunc implements LoadMetadata {
 
   private static final Logger LOG = LoggerFactory.getLogger(TFileLoader.class);
 
-  private TFileRecordReader recReader = null;
+  protected TFileRecordReader recReader = null;
 
-  private BufferedReader bufReader;
   private Text currentKey;
   private final TupleFactory tupleFactory = TupleFactory.getInstance();
 
   private final Pattern PATTERN = Pattern.compile(":");
 
-  /**
-   * We get one complete TFile per KV read.
-   * Add a BufferedReader so that we can scan a line at a time.
-   *
-   * @throws java.io.IOException
-   * @throws InterruptedException
-   */
-  //TODO: tasks can sometime throw OOM when single TFile is way too large. Adjust mem accordinly.
-  private void setupReader() throws IOException, InterruptedException {
-    if (recReader.nextKeyValue() && bufReader == null) {
-      currentKey = recReader.getCurrentKey();
-      Text val = recReader.getCurrentValue();
-      bufReader = new BufferedReader(new StringReader(val.toString()));
-    }
-  }
-
   @Override
   public Tuple getNext() throws IOException {
     try {
-      String line = readLine();
-      if (line != null) {
-        //machine, key, line
-        Tuple tuple = tupleFactory.newTuple(3);
-        if (currentKey != null) {
-          String[] data = PATTERN.split(currentKey.toString());
-          if (data == null || data.length != 2) {
-            LOG.warn("unable to parse " + currentKey.toString());
-            return null;
-          }
-          tuple.set(0, data[0]);
-          tuple.set(1, data[1]);
-        } else {
-          tuple.set(0, "");
-          tuple.set(1, "");
+      if (!recReader.nextKeyValue()) {
+        return null;
+      }
+
+      currentKey = recReader.getCurrentKey();
+      String line = recReader.getCurrentValue().toString();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("currentKey: " + currentKey
+                + ", line=" + line);
+      }
+      //Tuple would be of format: machine, key, line
+      Tuple tuple = tupleFactory.newTuple(3);
+      if (currentKey != null) {
+        String[] data = PATTERN.split(currentKey.toString());
+        if (data == null || data.length != 2) {
+          LOG.warn("unable to parse " + currentKey.toString());
+          return null;
         }
-        tuple.set(2, line); //line
-        return tuple;
+        tuple.set(0, data[0]);
+        tuple.set(1, data[1]);
+      } else {
+        tuple.set(0, "");
+        tuple.set(1, "");
       }
-    } catch (IOException e) {
-      return null;
+      //set the line field
+      tuple.set(2, line);
+      return tuple;
     } catch (InterruptedException e) {
       return null;
     }
-    return null;
-  }
-
-  private String readLine() throws IOException, InterruptedException {
-    String line = null;
-    if (bufReader == null) {
-      setupReader();
-    }
-    line = bufReader.readLine();
-    if (line == null) { //end of stream. Move to the next reader
-      bufReader = null;
-      setupReader();
-      if (bufReader != null) {
-        line = bufReader.readLine();
-      }
-    }
-    return line;
   }
 
   public static class TFileInputFormat extends

http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
----------------------------------------------------------------------
diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
index 70c0ee1..4d6c0f2 100644
--- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
+++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.tools;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,10 +33,14 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
+import java.io.BufferedReader;
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStreamReader;
 
 /**
- * Simple record reader which reads the TFile and emits it as key, value pair
+ * Simple record reader which reads the TFile and emits it as key, value pair.
+ * If value has multiple lines, read one line at a time.
  */
 public class TFileRecordReader extends RecordReader<Text, Text> {
 
@@ -43,17 +48,22 @@ public class TFileRecordReader extends RecordReader<Text, Text> {
 
   private long start, end;
 
-  private Path splitPath;
+  @VisibleForTesting
+  protected Path splitPath;
   private FSDataInputStream fin;
-  private TFile.Reader reader;
-  private TFile.Reader.Scanner scanner;
+
+  @VisibleForTesting
+  protected TFile.Reader reader;
+  @VisibleForTesting
+  protected TFile.Reader.Scanner scanner;
 
   private Text key = new Text();
   private Text value = new Text();
 
-  private BytesWritable valueBytesWritable = new BytesWritable();
   private BytesWritable keyBytesWritable = new BytesWritable();
 
+  private BufferedReader currentValueReader;
+
   @Override public void initialize(InputSplit split, TaskAttemptContext context)
       throws IOException, InterruptedException {
     FileSplit fileSplit = (FileSplit) split;
@@ -69,22 +79,46 @@ public class TFileRecordReader extends RecordReader<Text, Text> {
     scanner = reader.createScannerByByteRange(start, fileSplit.getLength());
   }
 
+  private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException {
+    entry.getKey(keyBytesWritable);
+    //splitpath contains the machine name. Create the key as splitPath + realKey
+    String keyStr = new StringBuilder()
+        .append(splitPath.getName()).append(":")
+        .append(new String(keyBytesWritable.getBytes()))
+        .toString();
+
+    /**
+     * In certain cases, values can be huge (files > 2 GB). Stream is
+     * better to handle such scenarios.
+     */
+    currentValueReader = new BufferedReader(
+        new InputStreamReader(entry.getValueStream()));
+    key.set(keyStr);
+    String line = currentValueReader.readLine();
+    value.set((line == null) ? "" : line);
+  }
+
   @Override public boolean nextKeyValue() throws IOException, InterruptedException {
-    valueBytesWritable.setSize(0);
-    if (!scanner.advance()) {
+    if (currentValueReader != null) {
+      //Still at the old entry reading line by line
+      String line = currentValueReader.readLine();
+      if (line != null) {
+        value.set(line);
+        return true;
+      } else {
+        //Read through all lines in the large value stream. Move to next KV.
+        scanner.advance();
+      }
+    }
+
+    try {
+      populateKV(scanner.entry());
+      return true;
+    } catch(EOFException eofException) {
+      key = null;
       value = null;
       return false;
     }
-    TFile.Reader.Scanner.Entry entry = scanner.entry();
-    //populate key, value
-    entry.getKey(keyBytesWritable);
-    StringBuilder k = new StringBuilder();
-    //split path contains the machine name. Create the key as splitPath + realKey
-    k.append(splitPath.getName()).append(":").append(new String(keyBytesWritable.getBytes()));
-    key.set(k.toString());
-    entry.getValue(valueBytesWritable);
-    value.set(valueBytesWritable.getBytes());
-    return true;
   }
 
   @Override public Text getCurrentKey() throws IOException, InterruptedException {