You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/02/02 01:44:21 UTC
tez git commit: TEZ-2974. Tez tools: TFileRecordReader in tez-tools
should support reading >2 GB tfiles (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 870972d2d -> 72f561639
TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/72f56163
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/72f56163
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/72f56163
Branch: refs/heads/master
Commit: 72f561639f828f2d8a815d52460e44fe2ea56d3a
Parents: 870972d
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Mon Feb 1 16:44:11 2016 -0800
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Mon Feb 1 16:44:11 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/tools/TFileLoader.java | 91 ++++++--------------
.../org/apache/tez/tools/TFileRecordReader.java | 68 +++++++++++----
3 files changed, 80 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a550015..6bff146 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles.
TEZ-3081. Update tez website for trademarks feedback.
TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
TEZ-3079. Fix tez-tfile parser documentation.
http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
----------------------------------------------------------------------
diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
index 7fbcbf6..18e9940 100644
--- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
+++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java
@@ -19,16 +19,11 @@
package org.apache.tez.tools;
import com.google.common.base.Objects;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.Expression;
@@ -36,17 +31,15 @@ import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.StringReader;
import java.util.regex.Pattern;
/**
@@ -57,74 +50,46 @@ public class TFileLoader extends FileInputLoadFunc implements LoadMetadata {
private static final Logger LOG = LoggerFactory.getLogger(TFileLoader.class);
- private TFileRecordReader recReader = null;
+ protected TFileRecordReader recReader = null;
- private BufferedReader bufReader;
private Text currentKey;
private final TupleFactory tupleFactory = TupleFactory.getInstance();
private final Pattern PATTERN = Pattern.compile(":");
- /**
- * We get one complete TFile per KV read.
- * Add a BufferedReader so that we can scan a line at a time.
- *
- * @throws java.io.IOException
- * @throws InterruptedException
- */
- //TODO: tasks can sometime throw OOM when single TFile is way too large. Adjust mem accordinly.
- private void setupReader() throws IOException, InterruptedException {
- if (recReader.nextKeyValue() && bufReader == null) {
- currentKey = recReader.getCurrentKey();
- Text val = recReader.getCurrentValue();
- bufReader = new BufferedReader(new StringReader(val.toString()));
- }
- }
-
@Override
public Tuple getNext() throws IOException {
try {
- String line = readLine();
- if (line != null) {
- //machine, key, line
- Tuple tuple = tupleFactory.newTuple(3);
- if (currentKey != null) {
- String[] data = PATTERN.split(currentKey.toString());
- if (data == null || data.length != 2) {
- LOG.warn("unable to parse " + currentKey.toString());
- return null;
- }
- tuple.set(0, data[0]);
- tuple.set(1, data[1]);
- } else {
- tuple.set(0, "");
- tuple.set(1, "");
+ if (!recReader.nextKeyValue()) {
+ return null;
+ }
+
+ currentKey = recReader.getCurrentKey();
+ String line = recReader.getCurrentValue().toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentKey: " + currentKey
+ + ", line=" + line);
+ }
+ //Tuple would be of format: machine, key, line
+ Tuple tuple = tupleFactory.newTuple(3);
+ if (currentKey != null) {
+ String[] data = PATTERN.split(currentKey.toString());
+ if (data == null || data.length != 2) {
+ LOG.warn("unable to parse " + currentKey.toString());
+ return null;
}
- tuple.set(2, line); //line
- return tuple;
+ tuple.set(0, data[0]);
+ tuple.set(1, data[1]);
+ } else {
+ tuple.set(0, "");
+ tuple.set(1, "");
}
- } catch (IOException e) {
- return null;
+ //set the line field
+ tuple.set(2, line);
+ return tuple;
} catch (InterruptedException e) {
return null;
}
- return null;
- }
-
- private String readLine() throws IOException, InterruptedException {
- String line = null;
- if (bufReader == null) {
- setupReader();
- }
- line = bufReader.readLine();
- if (line == null) { //end of stream. Move to the next reader
- bufReader = null;
- setupReader();
- if (bufReader != null) {
- line = bufReader.readLine();
- }
- }
- return line;
}
public static class TFileInputFormat extends
http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
----------------------------------------------------------------------
diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
index 70c0ee1..4d6c0f2 100644
--- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
+++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
@@ -18,6 +18,7 @@
package org.apache.tez.tools;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,10 +33,14 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import java.io.BufferedReader;
+import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStreamReader;
/**
- * Simple record reader which reads the TFile and emits it as key, value pair
+ * Simple record reader which reads the TFile and emits it as key, value pair.
+ * If value has multiple lines, read one line at a time.
*/
public class TFileRecordReader extends RecordReader<Text, Text> {
@@ -43,17 +48,22 @@ public class TFileRecordReader extends RecordReader<Text, Text> {
private long start, end;
- private Path splitPath;
+ @VisibleForTesting
+ protected Path splitPath;
private FSDataInputStream fin;
- private TFile.Reader reader;
- private TFile.Reader.Scanner scanner;
+
+ @VisibleForTesting
+ protected TFile.Reader reader;
+ @VisibleForTesting
+ protected TFile.Reader.Scanner scanner;
private Text key = new Text();
private Text value = new Text();
- private BytesWritable valueBytesWritable = new BytesWritable();
private BytesWritable keyBytesWritable = new BytesWritable();
+ private BufferedReader currentValueReader;
+
@Override public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
@@ -69,22 +79,46 @@ public class TFileRecordReader extends RecordReader<Text, Text> {
scanner = reader.createScannerByByteRange(start, fileSplit.getLength());
}
+ private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException {
+ entry.getKey(keyBytesWritable);
+ //splitpath contains the machine name. Create the key as splitPath + realKey
+ String keyStr = new StringBuilder()
+ .append(splitPath.getName()).append(":")
+ .append(new String(keyBytesWritable.getBytes()))
+ .toString();
+
+ /**
+ * In certain cases, values can be huge (files > 2 GB). Stream is
+ * better to handle such scenarios.
+ */
+ currentValueReader = new BufferedReader(
+ new InputStreamReader(entry.getValueStream()));
+ key.set(keyStr);
+ String line = currentValueReader.readLine();
+ value.set((line == null) ? "" : line);
+ }
+
@Override public boolean nextKeyValue() throws IOException, InterruptedException {
- valueBytesWritable.setSize(0);
- if (!scanner.advance()) {
+ if (currentValueReader != null) {
+ //Still at the old entry reading line by line
+ String line = currentValueReader.readLine();
+ if (line != null) {
+ value.set(line);
+ return true;
+ } else {
+ //Read through all lines in the large value stream. Move to next KV.
+ scanner.advance();
+ }
+ }
+
+ try {
+ populateKV(scanner.entry());
+ return true;
+ } catch(EOFException eofException) {
+ key = null;
value = null;
return false;
}
- TFile.Reader.Scanner.Entry entry = scanner.entry();
- //populate key, value
- entry.getKey(keyBytesWritable);
- StringBuilder k = new StringBuilder();
- //split path contains the machine name. Create the key as splitPath + realKey
- k.append(splitPath.getName()).append(":").append(new String(keyBytesWritable.getBytes()));
- key.set(k.toString());
- entry.getValue(valueBytesWritable);
- value.set(valueBytesWritable.getBytes());
- return true;
}
@Override public Text getCurrentKey() throws IOException, InterruptedException {