You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/07/27 16:09:12 UTC
tez git commit: TEZ-3377. Remove ShuffleHandler dependency on
mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion (Kuhu Shukla via
jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 738f9a614 -> e272e370a
TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e272e370
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e272e370
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e272e370
Branch: refs/heads/TEZ-3334
Commit: e272e370ad4a8fbcb0b567de63651b5445d991b8
Parents: 738f9a6
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Jul 27 11:08:51 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Jul 27 11:08:51 2016 -0500
----------------------------------------------------------------------
.../tez/auxservices/FadvisedChunkedFile.java | 85 ++++++++++
.../tez/auxservices/FadvisedFileRegion.java | 167 +++++++++++++++++++
.../apache/tez/auxservices/ShuffleHandler.java | 2 -
3 files changed, 252 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
new file mode 100644
index 0000000..e14b79d
--- /dev/null
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.auxservices;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
+
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+
+ private ReadaheadRequest readaheadRequest;
+
+ public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+ int chunkSize, boolean manageOsCache, int readaheadLength,
+ ReadaheadPool readaheadPool, String identifier) throws IOException {
+ super(file, position, count, chunkSize);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Object nextChunk() throws Exception {
+ if (manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool
+ .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+ getEndOffset(), readaheadRequest);
+ }
+ return super.nextChunk();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ try {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+ fd,
+ getStartOffset(), getEndOffset() - getStartOffset(),
+ POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
new file mode 100644
index 0000000..0e62345
--- /dev/null
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.auxservices;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
+
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final FileChannel fileChannel;
+
+ private ReadaheadRequest readaheadRequest;
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
+ super(file.getChannel(), position, count);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position)
+ throws IOException {
+ if (readaheadPool != null && readaheadLength > 0) {
+ readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+ getPosition() + position, readaheadLength,
+ getPosition() + getCount(), readaheadRequest);
+ }
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
+ }
+
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
+ }
+
+
+ @Override
+ public void releaseExternalResources() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ super.releaseExternalResources();
+ }
+
+ /**
+ * Call when the transfer completes successfully so we can advise the OS that
+ * we don't need the region to be cached anymore.
+ */
+ public void transferSuccessful() {
+ if (manageOsCache && getCount() > 0) {
+ try {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+ fd, getPosition(), getCount(), POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e272e370/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index e5b4c79..cccd1a0 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -65,8 +65,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
-import org.apache.hadoop.mapred.FadvisedFileRegion;
import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.tez.mapreduce.hadoop.MRConfig;