You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/07/27 16:09:12 UTC

tez git commit: TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion (Kuhu Shukla via jeagles)

Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 738f9a614 -> e272e370a


TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e272e370
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e272e370
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e272e370

Branch: refs/heads/TEZ-3334
Commit: e272e370ad4a8fbcb0b567de63651b5445d991b8
Parents: 738f9a6
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Jul 27 11:08:51 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Jul 27 11:08:51 2016 -0500

----------------------------------------------------------------------
 .../tez/auxservices/FadvisedChunkedFile.java    |  85 ++++++++++
 .../tez/auxservices/FadvisedFileRegion.java     | 167 +++++++++++++++++++
 .../apache/tez/auxservices/ShuffleHandler.java  |   2 -
 3 files changed, 252 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
new file mode 100644
index 0000000..e14b79d
--- /dev/null
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.auxservices;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
+
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+                             int chunkSize, boolean manageOsCache, int readaheadLength,
+                             ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
new file mode 100644
index 0000000..0e62345
--- /dev/null
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.auxservices;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
+
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final FileChannel fileChannel;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier, int shuffleBufferSize,
+                            boolean shuffleTransferToAllowed) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (readaheadPool != null && readaheadLength > 0) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    }
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from
+   * a disk to a local buffer in memory, and then it transfers data from the
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows
+   * due to a small IO request generated. customShuffleTransfer can control
+   * the size of the IO requests by changing the size of the intermediate
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+              " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans;
+        trans = 0;
+      }
+
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+
+      byteBuffer.clear();
+    }
+
+    return actualCount - trans;
+  }
+
+
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    super.releaseExternalResources();
+  }
+
+  /**
+   * Call when the transfer completes successfully so we can advise the OS that
+   * we don't need the region to be cached anymore.
+   */
+  public void transferSuccessful() {
+    if (manageOsCache && getCount() > 0) {
+      try {
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+            fd, getPosition(), getCount(), POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index e5b4c79..cccd1a0 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -65,8 +65,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
-import org.apache.hadoop.mapred.FadvisedFileRegion;
 import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.tez.mapreduce.hadoop.MRConfig;