You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/29 18:59:44 UTC

git commit: TEZ-589. Bugs in Broadcast fragment fetch code path (bikas)

Updated Branches:
  refs/heads/master 84c6c0bc1 -> 8496fccc1


TEZ-589. Bugs in Broadcast fragment fetch code path (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8496fccc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8496fccc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8496fccc

Branch: refs/heads/master
Commit: 8496fccc1969a23109fac826f1492aa90c4f3300
Parents: 84c6c0b
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Oct 29 10:57:06 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Oct 29 10:57:06 2013 -0700

----------------------------------------------------------------------
 .../broadcast/input/BroadcastInputManager.java  |  3 ++-
 .../broadcast/output/FileBasedKVWriter.java     |  2 +-
 .../runtime/library/common/sort/impl/IFile.java |  6 +++++
 .../common/sort/impl/IFileInputStream.java      | 28 +++++++++++++++++---
 .../task/local/output/TezTaskOutputFiles.java   |  9 ++++---
 .../shuffle/common/DiskFetchedInput.java        | 20 +++++++++++++-
 6 files changed, 58 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index add7371..512db50 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -84,7 +84,8 @@ public class BroadcastInputManager implements FetchedInputAllocator,
 
     this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
     
-    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + 
+    this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 3bfa3f3..7f58594 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -90,7 +90,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
 
     this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf,
         outputContext);
-    LOG.info("Craeted KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
+    LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
         : codec.getClass().getName()));
 
     initWriter();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index c9cbc55..324e87e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -492,6 +492,12 @@ public class IFile {
     
     public KeyState readRawKey(DataInputBuffer key) throws IOException {
       if (!positionToNextRecord(dataIn)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("currentKeyLength=" + currentKeyLength +
+              ", currentValueLength=" + currentValueLength +
+              ", bytesRead=" + bytesRead + 
+              ", length=" + fileLength);
+        }
         return KeyState.NO_KEY;
       }
       if(currentKeyLength == RLE_MARKER) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index 69ff394..e919d3a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -172,6 +172,7 @@ public class IFileInputStream extends InputStream {
    * At EOF, checksum is validated, but the checksum
    * bytes are not passed back in the buffer. 
    */
+  @Override
   public int read(byte[] b, int off, int len) throws IOException {
 
     if (currentOffset >= dataLength) {
@@ -233,14 +234,24 @@ public class IFileInputStream extends InputStream {
     
     // If we are trying to read past the end of data, just read
     // the left over data
+    int origLen = len;
     if (currentOffset + len > dataLength) {
-      len = (int) dataLength - (int)currentOffset;
+      len = (int) (dataLength - currentOffset);
     }
     
     int bytesRead = in.read(b, off, len);
 
     if (bytesRead < 0) {
-      throw new ChecksumException("Checksum Error", 0);
+      String mesg = " CurrentOffset=" + currentOffset +
+          ", offset=" + offset +
+          ", off=" + off +
+          ", dataLength=" + dataLength + 
+          ", origLen=" + origLen +
+          ", len=" + len +
+          ", length=" + length +
+          ", checksumSize=" + checksumSize;
+      LOG.info(mesg);
+      throw new ChecksumException("Checksum Error: " + mesg, 0);
     }
 
     checksum(b, off, bytesRead);
@@ -257,7 +268,18 @@ public class IFileInputStream extends InputStream {
       csum = new byte[checksumSize];
       IOUtils.readFully(in, csum, 0, checksumSize);
       if (!sum.compare(csum, 0)) {
-        throw new ChecksumException("Checksum Error", 0);
+        String mesg = "CurrentOffset=" + currentOffset +
+            ", off=" + offset +
+            ", dataLength=" + dataLength + 
+            ", origLen=" + origLen +
+            ", len=" + len +
+            ", length=" + length +
+            ", checksumSize=" + checksumSize+
+            ", csum=" + csum +
+            ", sum=" + sum; 
+        LOG.info(mesg);
+
+        throw new ChecksumException("Checksum Error: " + mesg, 0);
       }
     }
     return bytesRead;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 2c18b4e..847a0bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -105,6 +105,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
    * @throws IOException
    */
   public Path getOutputFileForWrite() throws IOException {
+    // TODO how to write 2 different broadcast outputs?????
     Path attemptOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
     return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
@@ -181,8 +182,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public Path getSpillFileForWrite(int spillNumber, long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(
-        String.format(String.format(SPILL_FILE_PATTERN,
-            uniqueId, spillNumber)), size, conf);
+        String.format(SPILL_FILE_PATTERN,
+            uniqueId, spillNumber), size, conf);
   }
 
   /**
@@ -234,8 +235,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
    */
   public Path getInputFileForWrite(int srcTaskId,
       long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        uniqueId, getAttemptOutputDir().toString(), srcTaskId),
+    return lDirAlloc.getLocalPathForWrite(String.format(SPILL_FILE_PATTERN,
+        uniqueId, srcTaskId),
         size, conf);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8496fccc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index a98ce63..3cdf20e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 
 import com.google.common.base.Preconditions;
@@ -40,7 +43,15 @@ public class DiskFetchedInput extends FetchedInput {
   private final FileSystem localFS;
   private final Path tmpOutputPath;
   private final Path outputPath;
+  
+  private static final long checkSumSize; 
 
+  static {
+    DataChecksum sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
+        Integer.MAX_VALUE);
+    checkSumSize = sum.getChecksumSize();
+  }
+  
   public DiskFetchedInput(long size,
       InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler, Configuration conf,
@@ -58,12 +69,19 @@ public class DiskFetchedInput extends FetchedInput {
   public OutputStream getOutputStream() throws IOException {
     return localFS.create(tmpOutputPath);
   }
+  
+  // Assumes that the file written to disk is an IFile that has a checksum 
+  // at the end. The size in super is the real data size.
+  @Override
+  public long getSize() {
+    return super.getSize() + checkSumSize;
+  }
 
   @Override
   public InputStream getInputStream() throws IOException {
     return localFS.open(outputPath);
   }
-
+  
   @Override
   public void commit() throws IOException {
     if (state == State.PENDING) {