You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/29 18:59:44 UTC
git commit: TEZ-589. Bugs in Broadcast fragment fetch code path
(bikas)
Updated Branches:
refs/heads/master 84c6c0bc1 -> 8496fccc1
TEZ-589. Bugs in Broadcast fragment fetch code path (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8496fccc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8496fccc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8496fccc
Branch: refs/heads/master
Commit: 8496fccc1969a23109fac826f1492aa90c4f3300
Parents: 84c6c0b
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Oct 29 10:57:06 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Oct 29 10:57:06 2013 -0700
----------------------------------------------------------------------
.../broadcast/input/BroadcastInputManager.java | 3 ++-
.../broadcast/output/FileBasedKVWriter.java | 2 +-
.../runtime/library/common/sort/impl/IFile.java | 6 +++++
.../common/sort/impl/IFileInputStream.java | 28 +++++++++++++++++---
.../task/local/output/TezTaskOutputFiles.java | 9 ++++---
.../shuffle/common/DiskFetchedInput.java | 20 +++++++++++++-
6 files changed, 58 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index add7371..512db50 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -84,7 +84,8 @@ public class BroadcastInputManager implements FetchedInputAllocator,
this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
- LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+ LOG.info("BroadcastInputManager -> " + "MemoryLimit: " +
+ this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 3bfa3f3..7f58594 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -90,7 +90,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf,
outputContext);
- LOG.info("Craeted KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
+ LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
: codec.getClass().getName()));
initWriter();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index c9cbc55..324e87e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -492,6 +492,12 @@ public class IFile {
public KeyState readRawKey(DataInputBuffer key) throws IOException {
if (!positionToNextRecord(dataIn)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentKeyLength=" + currentKeyLength +
+ ", currentValueLength=" + currentValueLength +
+ ", bytesRead=" + bytesRead +
+ ", length=" + fileLength);
+ }
return KeyState.NO_KEY;
}
if(currentKeyLength == RLE_MARKER) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index 69ff394..e919d3a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -172,6 +172,7 @@ public class IFileInputStream extends InputStream {
* At EOF, checksum is validated, but the checksum
* bytes are not passed back in the buffer.
*/
+ @Override
public int read(byte[] b, int off, int len) throws IOException {
if (currentOffset >= dataLength) {
@@ -233,14 +234,24 @@ public class IFileInputStream extends InputStream {
// If we are trying to read past the end of data, just read
// the left over data
+ int origLen = len;
if (currentOffset + len > dataLength) {
- len = (int) dataLength - (int)currentOffset;
+ len = (int) (dataLength - currentOffset);
}
int bytesRead = in.read(b, off, len);
if (bytesRead < 0) {
- throw new ChecksumException("Checksum Error", 0);
+ String mesg = " CurrentOffset=" + currentOffset +
+ ", offset=" + offset +
+ ", off=" + off +
+ ", dataLength=" + dataLength +
+ ", origLen=" + origLen +
+ ", len=" + len +
+ ", length=" + length +
+ ", checksumSize=" + checksumSize;
+ LOG.info(mesg);
+ throw new ChecksumException("Checksum Error: " + mesg, 0);
}
checksum(b, off, bytesRead);
@@ -257,7 +268,18 @@ public class IFileInputStream extends InputStream {
csum = new byte[checksumSize];
IOUtils.readFully(in, csum, 0, checksumSize);
if (!sum.compare(csum, 0)) {
- throw new ChecksumException("Checksum Error", 0);
+ String mesg = "CurrentOffset=" + currentOffset +
+ ", off=" + offset +
+ ", dataLength=" + dataLength +
+ ", origLen=" + origLen +
+ ", len=" + len +
+ ", length=" + length +
+ ", checksumSize=" + checksumSize+
+ ", csum=" + csum +
+ ", sum=" + sum;
+ LOG.info(mesg);
+
+ throw new ChecksumException("Checksum Error: " + mesg, 0);
}
}
return bytesRead;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 2c18b4e..847a0bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -105,6 +105,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
* @throws IOException
*/
public Path getOutputFileForWrite() throws IOException {
+ // TODO how to write 2 different broadcast outputs?????
Path attemptOutput =
new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
@@ -181,8 +182,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(
- String.format(String.format(SPILL_FILE_PATTERN,
- uniqueId, spillNumber)), size, conf);
+ String.format(SPILL_FILE_PATTERN,
+ uniqueId, spillNumber), size, conf);
}
/**
@@ -234,8 +235,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
*/
public Path getInputFileForWrite(int srcTaskId,
long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- uniqueId, getAttemptOutputDir().toString(), srcTaskId),
+ return lDirAlloc.getLocalPathForWrite(String.format(SPILL_FILE_PATTERN,
+ uniqueId, srcTaskId),
size, conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index a98ce63..3cdf20e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import com.google.common.base.Preconditions;
@@ -40,7 +43,15 @@ public class DiskFetchedInput extends FetchedInput {
private final FileSystem localFS;
private final Path tmpOutputPath;
private final Path outputPath;
+
+ private static final long checkSumSize;
+ static {
+ DataChecksum sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+ Integer.MAX_VALUE);
+ checkSumSize = sum.getChecksumSize();
+ }
+
public DiskFetchedInput(long size,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler, Configuration conf,
@@ -58,12 +69,19 @@ public class DiskFetchedInput extends FetchedInput {
public OutputStream getOutputStream() throws IOException {
return localFS.create(tmpOutputPath);
}
+
+ // Assumes that the file written to disk is an IFile that has a checksum
+ // at the end. The size in super is the real data size.
+ @Override
+ public long getSize() {
+ return super.getSize() + checkSumSize;
+ }
@Override
public InputStream getInputStream() throws IOException {
return localFS.open(outputPath);
}
-
+
@Override
public void commit() throws IOException {
if (state == State.PENDING) {