You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 09:26:44 UTC
[45/50] [abbrv] tez git commit: TEZ-2714. Fix comments from review -
part 3. (sseth)
TEZ-2714. Fix comments from review - part 3. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fbf1fcc2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fbf1fcc2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fbf1fcc2
Branch: refs/heads/master
Commit: fbf1fcc2fcc439a640fc2c5ae9f37f807808a3e9
Parents: 2ef4626
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 14 14:20:13 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:15:24 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../api/ServicePluginsDescriptor.java | 20 +++
.../dag/app/rm/TaskSchedulerEventHandler.java | 5 +-
.../tez/dag/app/rm/container/AMContainer.java | 1 -
.../apache/tez/dag/app/MockDAGAppMaster.java | 20 ++-
.../app/rm/TestTaskSchedulerEventHandler.java | 2 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 5 +-
.../tez/shufflehandler/FadvisedChunkedFile.java | 78 ---------
.../tez/shufflehandler/FadvisedFileRegion.java | 160 -------------------
.../apache/tez/shufflehandler/IndexCache.java | 12 +-
.../tez/shufflehandler/ShuffleHandler.java | 80 +---------
11 files changed, 45 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 8a8e257..fed203a 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -49,5 +49,6 @@ ALL CHANGES:
TEZ-2707. Fix comments from reviews - part 2.
TEZ-2713. Add tests for node handling when there's multiple schedulers.
TEZ-2721. rebase 08/14
+ TEZ-2714. Fix comments from review - part 3.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index 113b7db..2dabed0 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -19,6 +19,7 @@ import java.util.Arrays;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.TezConfiguration;
/**
* An {@link ServicePluginsDescriptor} describes the list of plugins running within the AM for
@@ -71,6 +72,13 @@ public class ServicePluginsDescriptor {
/**
* Create a service plugin descriptor with the provided plugins. Also allows specification of whether
* in-AM execution is enabled. Container execution is enabled by default.
+ *
+ * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched within the
+ * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}.
+ * The AM will need to be sized correctly for the tasks. Memory allocation to the running task
+ * cannot be controlled yet, and is the full AM heap for each task.
+ * TODO: TEZ-2722
+ *
* @param enableUber whether to enable execution in the AM or not
* @param taskSchedulerDescriptor the task scheduler plugin descriptors
* @param containerLauncherDescriptors the container launcher plugin descriptors
@@ -89,6 +97,12 @@ public class ServicePluginsDescriptor {
* Create a service plugin descriptor with the provided plugins. Also allows specification of whether
* container execution and in-AM execution will be enabled.
*
+ * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched within the
+ * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}.
+ * The AM will need to be sized correctly for the tasks. Memory allocation to the running task
+ * cannot be controlled yet, and is the full AM heap for each task.
+ * TODO: TEZ-2722
+ *
* @param enableContainers whether to enable execution in containers
* @param enableUber whether to enable execution in the AM or not
* @param taskSchedulerDescriptor the task scheduler plugin descriptors
@@ -108,6 +122,12 @@ public class ServicePluginsDescriptor {
* Create a service plugin descriptor which may have in-AM execution of tasks enabled. Container
* execution is enabled by default
*
+ * Note on Uber mode: This is NOT fully supported at the moment. Tasks will be launched within the
+ * AM process itself, controlled by {@link TezConfiguration#TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS}.
+ * The AM will need to be sized correctly for the tasks. Memory allocation to the running task
+ * cannot be controlled yet, and is the full AM heap for each task.
+ * TODO: TEZ-2722
+ *
* @param enableUber whether to enable execution in the AM or not
* @return a {@link ServicePluginsDescriptor} instance
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index a127ddf..374189a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -443,7 +443,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
}
@VisibleForTesting
- protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+ protected void instantiateSchedulers(String host, int port, String trackingUrl,
+ AppContext appContext) {
// Iterate over the list and create all the taskSchedulers
int j = 0;
for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
@@ -472,7 +473,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
// always try to connect to AM and proxy the response. hence it wont work if the webUIService
// is not enabled.
String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
- instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
+ instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
for (int i = 0 ; i < taskSchedulers.length ; i++) {
taskSchedulerServiceWrappers[i].init(getConfig());
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 4b2d528..8f5034e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.records.TezTaskAttemptID;
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index fe3e4ef..b09eb86 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -56,8 +58,6 @@ import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
@@ -325,11 +325,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
}
- private void doHeartbeat(TaskHeartbeatRequest request, ContainerData cData) throws Exception {
+ private void doHeartbeat(TezHeartbeatRequest request, ContainerData cData) throws Exception {
long startTime = System.nanoTime();
long startCpuTime = threadMxBean.getCurrentThreadCpuTime();
- TaskHeartbeatResponse response = taListener.heartbeat(request);
- if (response.isShouldDie()) {
+ TezHeartbeatResponse response = taskCommunicator.getUmbilical().heartbeat(request);
+ if (response.shouldDie()) {
cData.remove();
} else {
cData.nextFromEventId = response.getNextFromEventId();
@@ -417,9 +417,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
- TaskHeartbeatRequest request =
- new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId,
- 50000);
+ TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
+ cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
doHeartbeat(request, cData);
} else if (version != null && cData.taId.getId() <= version.intValue()) {
preemptContainer(cData);
@@ -430,9 +429,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
new TaskAttemptCompletedEvent(), new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
- TaskHeartbeatRequest request =
- new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId,
- 10000);
+ TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
+ cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
doHeartbeat(request, cData);
cData.clear();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 1550085..c85be6c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -117,7 +117,7 @@ public class TestTaskSchedulerEventHandler {
}
@Override
- protected void instantiateScheduelrs(String host, int port, String trackingUrl,
+ protected void instantiateSchedulers(String host, int port, String trackingUrl,
AppContext appContext) {
taskSchedulers[0] = mockTaskScheduler;
taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 0746507..c13ca5a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -147,7 +147,8 @@ class TestTaskSchedulerHelpers {
}
@Override
- public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+ public void instantiateSchedulers(String host, int port, String trackingUrl,
+ AppContext appContext) {
TaskSchedulerContext taskSchedulerContext =
new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port,
defaultPayload);
@@ -166,7 +167,7 @@ class TestTaskSchedulerHelpers {
@Override
public void serviceStart() {
- instantiateScheduelrs("host", 0, "", appContext);
+ instantiateSchedulers("host", 0, "", appContext);
// Init the service so that reuse configuration is picked up.
((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
((AbstractService)taskSchedulerServiceWrappers[0]).start();
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
deleted file mode 100644
index 294add6..0000000
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.shufflehandler;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FadvisedChunkedFile extends ChunkedFile {
-
- private static final Logger LOG = LoggerFactory.getLogger(FadvisedChunkedFile.class);
-
- private final boolean manageOsCache;
- private final int readaheadLength;
- private final ReadaheadPool readaheadPool;
- private final FileDescriptor fd;
- private final String identifier;
-
- private ReadaheadRequest readaheadRequest;
-
- public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
- int chunkSize, boolean manageOsCache, int readaheadLength,
- ReadaheadPool readaheadPool, String identifier) throws IOException {
- super(file, position, count, chunkSize);
- this.manageOsCache = manageOsCache;
- this.readaheadLength = readaheadLength;
- this.readaheadPool = readaheadPool;
- this.fd = file.getFD();
- this.identifier = identifier;
- }
-
- @Override
- public Object nextChunk() throws Exception {
- if (manageOsCache && readaheadPool != null) {
- readaheadRequest = readaheadPool
- .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
- getEndOffset(), readaheadRequest);
- }
- return super.nextChunk();
- }
-
- @Override
- public void close() throws Exception {
- if (readaheadRequest != null) {
- readaheadRequest.cancel();
- }
- if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
- try {
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
- fd,
- getStartOffset(), getEndOffset() - getStartOffset(),
- NativeIO.POSIX.POSIX_FADV_DONTNEED);
- } catch (Throwable t) {
- LOG.warn("Failed to manage OS cache for " + identifier, t);
- }
- }
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
deleted file mode 100644
index e5392d3..0000000
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.shufflehandler;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.channel.DefaultFileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FadvisedFileRegion extends DefaultFileRegion {
-
- private static final Logger LOG = LoggerFactory.getLogger(FadvisedFileRegion.class);
-
- private final boolean manageOsCache;
- private final int readaheadLength;
- private final ReadaheadPool readaheadPool;
- private final FileDescriptor fd;
- private final String identifier;
- private final long count;
- private final long position;
- private final int shuffleBufferSize;
- private final boolean shuffleTransferToAllowed;
- private final FileChannel fileChannel;
-
- private ReadaheadRequest readaheadRequest;
-
- public FadvisedFileRegion(RandomAccessFile file, long position, long count,
- boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier, int shuffleBufferSize,
- boolean shuffleTransferToAllowed) throws IOException {
- super(file.getChannel(), position, count);
- this.manageOsCache = manageOsCache;
- this.readaheadLength = readaheadLength;
- this.readaheadPool = readaheadPool;
- this.fd = file.getFD();
- this.identifier = identifier;
- this.fileChannel = file.getChannel();
- this.count = count;
- this.position = position;
- this.shuffleBufferSize = shuffleBufferSize;
- this.shuffleTransferToAllowed = shuffleTransferToAllowed;
- }
-
- @Override
- public long transferTo(WritableByteChannel target, long position)
- throws IOException {
- if (manageOsCache && readaheadPool != null) {
- readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
- getPosition() + position, readaheadLength,
- getPosition() + getCount(), readaheadRequest);
- }
-
- if(this.shuffleTransferToAllowed) {
- return super.transferTo(target, position);
- } else {
- return customShuffleTransfer(target, position);
- }
- }
-
- /**
- * This method transfers data using local buffer. It transfers data from
- * a disk to a local buffer in memory, and then it transfers data from the
- * buffer to the target. This is used only if transferTo is disallowed in
- * the configuration file. super.TransferTo does not perform well on Windows
- * due to a small IO request generated. customShuffleTransfer can control
- * the size of the IO requests by changing the size of the intermediate
- * buffer.
- */
- @VisibleForTesting
- long customShuffleTransfer(WritableByteChannel target, long position)
- throws IOException {
- long actualCount = this.count - position;
- if (actualCount < 0 || position < 0) {
- throw new IllegalArgumentException(
- "position out of range: " + position +
- " (expected: 0 - " + (this.count - 1) + ')');
- }
- if (actualCount == 0) {
- return 0L;
- }
-
- long trans = actualCount;
- int readSize;
- ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
-
- while(trans > 0L &&
- (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
- //adjust counters and buffer limit
- if(readSize < trans) {
- trans -= readSize;
- position += readSize;
- byteBuffer.flip();
- } else {
- //We can read more than we need if the actualCount is not multiple
- //of the byteBuffer size and file is big enough. In that case we cannot
- //use flip method but we need to set buffer limit manually to trans.
- byteBuffer.limit((int)trans);
- byteBuffer.position(0);
- position += trans;
- trans = 0;
- }
-
- //write data to the target
- while(byteBuffer.hasRemaining()) {
- target.write(byteBuffer);
- }
-
- byteBuffer.clear();
- }
-
- return actualCount - trans;
- }
-
-
- @Override
- public void releaseExternalResources() {
- if (readaheadRequest != null) {
- readaheadRequest.cancel();
- }
- super.releaseExternalResources();
- }
-
- /**
- * Call when the transfer completes successfully so we can advise the OS that
- * we don't need the region to be cached anymore.
- */
- public void transferSuccessful() {
- if (manageOsCache && getCount() > 0) {
- try {
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
- fd, getPosition(), getCount(),
- NativeIO.POSIX.POSIX_FADV_DONTNEED);
- } catch (Throwable t) {
- LOG.warn("Failed to manage OS cache for " + identifier, t);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
index 5a45917..e358fcc 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
@@ -1,11 +1,7 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
http://git-wip-us.apache.org/repos/asf/tez/blob/fbf1fcc2/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 8cbb8c7..046ce18 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -33,7 +33,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
-import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,15 +53,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.mapred.FadvisedChunkedFile;
+import org.apache.hadoop.mapred.FadvisedFileRegion;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
@@ -199,27 +194,6 @@ public class ShuffleHandler {
private static final AtomicBoolean initing = new AtomicBoolean(false);
private static ShuffleHandler INSTANCE;
- @Metrics(about="Shuffle output metrics", context="mapred")
- static class ShuffleMetrics implements ChannelFutureListener {
- @Metric("Shuffle output in bytes")
- MutableCounterLong shuffleOutputBytes;
- @Metric("# of failed shuffle outputs")
- MutableCounterInt shuffleOutputsFailed;
- @Metric("# of succeeeded shuffle outputs")
- MutableCounterInt shuffleOutputsOK;
- @Metric("# of current shuffle connections")
- MutableGaugeInt shuffleConnections;
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- shuffleOutputsOK.incr();
- } else {
- shuffleOutputsFailed.incr();
- }
- shuffleConnections.decr();
- }
- }
public ShuffleHandler(Configuration conf) {
this.conf = conf;
@@ -299,57 +273,11 @@ public class ShuffleHandler {
}
public static ShuffleHandler get() {
- Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking started");
+ Preconditions.checkState(started.get(),
+ "ShuffleHandler must be started before invoking started");
return INSTANCE;
}
- /**
- * Serialize the shuffle port into a ByteBuffer for use later on.
- * @param port the port to be sent to the ApplciationMaster
- * @return the serialized form of the port.
- */
- public static ByteBuffer serializeMetaData(int port) throws IOException {
- //TODO these bytes should be versioned
- DataOutputBuffer port_dob = new DataOutputBuffer();
- port_dob.writeInt(port);
- return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
- }
-
- /**
- * A helper function to deserialize the metadata returned by ShuffleHandler.
- * @param meta the metadata returned by the ShuffleHandler
- * @return the port the Shuffle Handler is listening on to serve shuffle data.
- */
- public static int deserializeMetaData(ByteBuffer meta) throws IOException {
- //TODO this should be returning a class not just an int
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(meta);
- int port = in.readInt();
- return port;
- }
-
- /**
- * A helper function to serialize the JobTokenIdentifier to be sent to the
- * ShuffleHandler as ServiceData.
- * @param jobToken the job token to be used for authentication of
- * shuffle data requests.
- * @return the serialized version of the jobToken.
- */
- public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
- //TODO these bytes should be versioned
- DataOutputBuffer jobToken_dob = new DataOutputBuffer();
- jobToken.write(jobToken_dob);
- return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
- }
-
- static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(secret);
- Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
- jt.readFields(in);
- return jt;
- }
-
public int getPort() {
return port;
}