You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:24 UTC
[15/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
deleted file mode 100644
index b2a0b54..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Comparator;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-
-
-class MapOutput {
- private static final Log LOG = LogFactory.getLog(MapOutput.class);
- private static AtomicInteger ID = new AtomicInteger(0);
-
- public static enum Type {
- WAIT,
- MEMORY,
- DISK
- }
-
- private InputAttemptIdentifier attemptIdentifier;
- private final int id;
-
- private final MergeManager merger;
-
- private final long size;
-
- private final byte[] memory;
- private BoundedByteArrayOutputStream byteStream;
-
- private final FileSystem localFS;
- private final Path tmpOutputPath;
- private final Path outputPath;
- private final OutputStream disk;
-
- private final Type type;
-
- private final boolean primaryMapOutput;
-
- MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, long size,
- Configuration conf, LocalDirAllocator localDirAllocator,
- int fetcher, boolean primaryMapOutput,
- TezTaskOutputFiles mapOutputFile)
- throws IOException {
- this.id = ID.incrementAndGet();
- this.attemptIdentifier = attemptIdentifier;
- this.merger = merger;
-
- type = Type.DISK;
-
- memory = null;
- byteStream = null;
-
- this.size = size;
-
- this.localFS = FileSystem.getLocal(conf);
- outputPath =
- mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
- tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
-
- disk = localFS.create(tmpOutputPath);
-
- this.primaryMapOutput = primaryMapOutput;
- }
-
- MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size,
- boolean primaryMapOutput) {
- this.id = ID.incrementAndGet();
- this.attemptIdentifier = attemptIdentifier;
- this.merger = merger;
-
- type = Type.MEMORY;
- byteStream = new BoundedByteArrayOutputStream(size);
- memory = byteStream.getBuffer();
-
- this.size = size;
-
- localFS = null;
- disk = null;
- outputPath = null;
- tmpOutputPath = null;
-
- this.primaryMapOutput = primaryMapOutput;
- }
-
- public MapOutput(InputAttemptIdentifier attemptIdentifier) {
- this.id = ID.incrementAndGet();
- this.attemptIdentifier = attemptIdentifier;
-
- type = Type.WAIT;
- merger = null;
- memory = null;
- byteStream = null;
-
- size = -1;
-
- localFS = null;
- disk = null;
- outputPath = null;
- tmpOutputPath = null;
-
- this.primaryMapOutput = false;
-}
-
- public boolean isPrimaryMapOutput() {
- return primaryMapOutput;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof MapOutput) {
- return id == ((MapOutput)obj).id;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return id;
- }
-
- public Path getOutputPath() {
- return outputPath;
- }
-
- public byte[] getMemory() {
- return memory;
- }
-
- public BoundedByteArrayOutputStream getArrayStream() {
- return byteStream;
- }
-
- public OutputStream getDisk() {
- return disk;
- }
-
- public InputAttemptIdentifier getAttemptIdentifier() {
- return this.attemptIdentifier;
- }
-
- public Type getType() {
- return type;
- }
-
- public long getSize() {
- return size;
- }
-
- public void commit() throws IOException {
- if (type == Type.MEMORY) {
- merger.closeInMemoryFile(this);
- } else if (type == Type.DISK) {
- localFS.rename(tmpOutputPath, outputPath);
- merger.closeOnDiskFile(outputPath);
- } else {
- throw new IOException("Cannot commit MapOutput of type WAIT!");
- }
- }
-
- public void abort() {
- if (type == Type.MEMORY) {
- merger.unreserve(memory.length);
- } else if (type == Type.DISK) {
- try {
- localFS.delete(tmpOutputPath, false);
- } catch (IOException ie) {
- LOG.info("failure to clean up " + tmpOutputPath, ie);
- }
- } else {
- throw new IllegalArgumentException
- ("Cannot commit MapOutput with of type WAIT!");
- }
- }
-
- public String toString() {
- return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
- }
-
- public static class MapOutputComparator
- implements Comparator<MapOutput> {
- public int compare(MapOutput o1, MapOutput o2) {
- if (o1.id == o2.id) {
- return 0;
- }
-
- if (o1.size < o2.size) {
- return -1;
- } else if (o1.size > o2.size) {
- return 1;
- }
-
- if (o1.id < o2.id) {
- return -1;
- } else {
- return 1;
-
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
deleted file mode 100644
index b8792fb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ /dev/null
@@ -1,782 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.TezMerger;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.engine.hadoop.compat.NullProgressable;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@SuppressWarnings(value={"rawtypes"})
-public class MergeManager {
-
- private static final Log LOG = LogFactory.getLog(MergeManager.class);
-
- private final Configuration conf;
- private final FileSystem localFS;
- private final FileSystem rfs;
- private final LocalDirAllocator localDirAllocator;
-
- private final TezTaskOutputFiles mapOutputFile;
- private final Progressable nullProgressable = new NullProgressable();
- private final Combiner combiner;
-
- Set<MapOutput> inMemoryMergedMapOutputs =
- new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
- private final IntermediateMemoryToMemoryMerger memToMemMerger;
-
- Set<MapOutput> inMemoryMapOutputs =
- new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
- private final InMemoryMerger inMemoryMerger;
-
- Set<Path> onDiskMapOutputs = new TreeSet<Path>();
- private final OnDiskMerger onDiskMerger;
-
- private final long memoryLimit;
- private long usedMemory;
- private long commitMemory;
- private final long maxSingleShuffleLimit;
-
- private final int memToMemMergeOutputsThreshold;
- private final long mergeThreshold;
-
- private final int ioSortFactor;
-
- private final ExceptionReporter exceptionReporter;
-
- private final TezInputContext inputContext;
-
- private final TezCounter spilledRecordsCounter;
-
- private final TezCounter reduceCombineInputCounter;
-
- private final TezCounter mergedMapOutputsCounter;
-
- private final CompressionCodec codec;
-
- private volatile boolean finalMergeComplete = false;
-
- public MergeManager(Configuration conf,
- FileSystem localFS,
- LocalDirAllocator localDirAllocator,
- TezInputContext inputContext,
- Combiner combiner,
- TezCounter spilledRecordsCounter,
- TezCounter reduceCombineInputCounter,
- TezCounter mergedMapOutputsCounter,
- ExceptionReporter exceptionReporter) {
- this.inputContext = inputContext;
- this.conf = conf;
- this.localDirAllocator = localDirAllocator;
- this.exceptionReporter = exceptionReporter;
-
- this.combiner = combiner;
-
- this.reduceCombineInputCounter = reduceCombineInputCounter;
- this.spilledRecordsCounter = spilledRecordsCounter;
- this.mergedMapOutputsCounter = mergedMapOutputsCounter;
- this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
-
- this.localFS = localFS;
- this.rfs = ((LocalFileSystem)localFS).getRaw();
-
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
-
- final float maxInMemCopyUse =
- conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
- if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
- throw new IllegalArgumentException("Invalid value for " +
- TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
- maxInMemCopyUse);
- }
-
- // Allow unit tests to fix Runtime memory
- this.memoryLimit =
- (long)(conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
- Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
- * maxInMemCopyUse);
-
- this.ioSortFactor =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
-
- final float singleShuffleMemoryLimitPercent =
- conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
- if (singleShuffleMemoryLimitPercent <= 0.0f
- || singleShuffleMemoryLimitPercent > 1.0f) {
- throw new IllegalArgumentException("Invalid value for "
- + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
- + singleShuffleMemoryLimitPercent);
- }
-
- this.maxSingleShuffleLimit =
- (long)(memoryLimit * singleShuffleMemoryLimitPercent);
- this.memToMemMergeOutputsThreshold =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS,
- ioSortFactor);
- this.mergeThreshold =
- (long)(this.memoryLimit *
- conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT));
- LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
- "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
- "mergeThreshold=" + mergeThreshold + ", " +
- "ioSortFactor=" + ioSortFactor + ", " +
- "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
- if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
- throw new RuntimeException("Invlaid configuration: "
- + "maxSingleShuffleLimit should be less than mergeThreshold"
- + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
- + "mergeThreshold: " + this.mergeThreshold);
- }
-
- boolean allowMemToMemMerge =
- conf.getBoolean(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
- if (allowMemToMemMerge) {
- this.memToMemMerger =
- new IntermediateMemoryToMemoryMerger(this,
- memToMemMergeOutputsThreshold);
- this.memToMemMerger.start();
- } else {
- this.memToMemMerger = null;
- }
-
- this.inMemoryMerger = new InMemoryMerger(this);
- this.inMemoryMerger.start();
-
- this.onDiskMerger = new OnDiskMerger(this);
- this.onDiskMerger.start();
- }
-
- public void waitForInMemoryMerge() throws InterruptedException {
- inMemoryMerger.waitForMerge();
- }
-
- private boolean canShuffleToMemory(long requestedSize) {
- return (requestedSize < maxSingleShuffleLimit);
- }
-
- final private MapOutput stallShuffle = new MapOutput(null);
-
- public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
- long requestedSize,
- int fetcher
- ) throws IOException {
- if (!canShuffleToMemory(requestedSize)) {
- LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize +
- " is greater than maxSingleShuffleLimit (" +
- maxSingleShuffleLimit + ")");
- return new MapOutput(srcAttemptIdentifier, this, requestedSize, conf,
- localDirAllocator, fetcher, true,
- mapOutputFile);
- }
-
- // Stall shuffle if we are above the memory limit
-
- // It is possible that all threads could just be stalling and not make
- // progress at all. This could happen when:
- //
- // requested size is causing the used memory to go above limit &&
- // requested size < singleShuffleLimit &&
- // current used size < mergeThreshold (merge will not get triggered)
- //
- // To avoid this from happening, we allow exactly one thread to go past
- // the memory limit. We check (usedMemory > memoryLimit) and not
- // (usedMemory + requestedSize > memoryLimit). When this thread is done
- // fetching, this will automatically trigger a merge thereby unlocking
- // all the stalled threads
-
- if (usedMemory > memoryLimit) {
- LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
- + ") is greater than memoryLimit (" + memoryLimit + ")." +
- " CommitMemory is (" + commitMemory + ")");
- return stallShuffle;
- }
-
- // Allow the in-memory shuffle to progress
- LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
- + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
- + "CommitMemory is (" + commitMemory + ")");
- return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
- }
-
- /**
- * Unconditional Reserve is used by the Memory-to-Memory thread
- * @return
- */
- private synchronized MapOutput unconditionalReserve(
- InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
- usedMemory += requestedSize;
- return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize,
- primaryMapOutput);
- }
-
- synchronized void unreserve(long size) {
- commitMemory -= size;
- usedMemory -= size;
- }
-
- public synchronized void closeInMemoryFile(MapOutput mapOutput) {
- inMemoryMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
- + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
- + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
-
- commitMemory+= mapOutput.getSize();
-
- synchronized (inMemoryMerger) {
- // Can hang if mergeThreshold is really low.
- if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
- LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
- commitMemory + " > mergeThreshold=" + mergeThreshold +
- ". Current usedMemory=" + usedMemory);
- inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
- inMemoryMergedMapOutputs.clear();
- inMemoryMerger.startMerge(inMemoryMapOutputs);
- }
- }
-
- if (memToMemMerger != null) {
- synchronized (memToMemMerger) {
- if (!memToMemMerger.isInProgress() &&
- inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
- memToMemMerger.startMerge(inMemoryMapOutputs);
- }
- }
- }
- }
-
-
- public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
- inMemoryMergedMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
- ", inMemoryMergedMapOutputs.size() -> " +
- inMemoryMergedMapOutputs.size());
- }
-
- public synchronized void closeOnDiskFile(Path file) {
- onDiskMapOutputs.add(file);
-
- synchronized (onDiskMerger) {
- if (!onDiskMerger.isInProgress() &&
- onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
- onDiskMerger.startMerge(onDiskMapOutputs);
- }
- }
- }
-
- /**
- * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
- * return an invalid state since a merge may not be in progress dur to
- * inadequate inputs
- *
- * @return true if the merge process is complete, otherwise false
- */
- @Private
- public boolean isMergeComplete() {
- return finalMergeComplete;
- }
-
- public TezRawKeyValueIterator close() throws Throwable {
- // Wait for on-going merges to complete
- if (memToMemMerger != null) {
- memToMemMerger.close();
- }
- inMemoryMerger.close();
- onDiskMerger.close();
-
- List<MapOutput> memory =
- new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
- memory.addAll(inMemoryMapOutputs);
- List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
- TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
- this.finalMergeComplete = true;
- return kvIter;
- }
-
- void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
- throws IOException, InterruptedException {
- combiner.combine(kvIter, writer);
- }
-
- private class IntermediateMemoryToMemoryMerger
- extends MergeThread<MapOutput> {
-
- public IntermediateMemoryToMemoryMerger(MergeManager manager,
- int mergeFactor) {
- super(manager, mergeFactor, exceptionReporter);
- setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
- "shuffled map-outputs");
- setDaemon(true);
- }
-
- @Override
- public void merge(List<MapOutput> inputs) throws IOException {
- if (inputs == null || inputs.size() == 0) {
- return;
- }
-
- InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
- List<Segment> inMemorySegments = new ArrayList<Segment>();
- long mergeOutputSize =
- createInMemorySegments(inputs, inMemorySegments, 0);
- int noInMemorySegments = inMemorySegments.size();
-
- MapOutput mergedMapOutputs =
- unconditionalReserve(dummyMapId, mergeOutputSize, false);
-
- Writer writer =
- new InMemoryWriter(mergedMapOutputs.getArrayStream());
-
- LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
- " segments of total-size: " + mergeOutputSize);
-
- TezRawKeyValueIterator rIter =
- TezMerger.merge(conf, rfs,
- ConfigUtils.getIntermediateInputKeyClass(conf),
- ConfigUtils.getIntermediateInputValueClass(conf),
- inMemorySegments, inMemorySegments.size(),
- new Path(inputContext.getUniqueIdentifier()),
- (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, null, null, null);
- TezMerger.writeFile(rIter, writer, nullProgressable, conf);
- writer.close();
-
- LOG.info(inputContext.getUniqueIdentifier() +
- " Memory-to-Memory merge of the " + noInMemorySegments +
- " files in-memory complete.");
-
- // Note the output of the merge
- closeInMemoryMergedFile(mergedMapOutputs);
- }
- }
-
- private class InMemoryMerger extends MergeThread<MapOutput> {
-
- public InMemoryMerger(MergeManager manager) {
- super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName
- ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
- setDaemon(true);
- }
-
- @Override
- public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
- if (inputs == null || inputs.size() == 0) {
- return;
- }
-
- //name this output file same as the name of the first file that is
- //there in the current list of inmem files (this is guaranteed to
- //be absent on the disk currently. So we don't overwrite a prev.
- //created spill). Also we need to create the output file now since
- //it is not guaranteed that this file will be present after merge
- //is called (we delete empty files as soon as we see them
- //in the merge method)
-
- //figure out the mapId
- InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
-
- List<Segment> inMemorySegments = new ArrayList<Segment>();
- long mergeOutputSize =
- createInMemorySegments(inputs, inMemorySegments,0);
- int noInMemorySegments = inMemorySegments.size();
-
- Path outputPath = mapOutputFile.getInputFileForWrite(
- srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(),
- mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
-
- Writer writer = null;
- try {
- writer =
- new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null);
-
- TezRawKeyValueIterator rIter = null;
- LOG.info("Initiating in-memory merge with " + noInMemorySegments +
- " segments...");
-
- rIter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- inMemorySegments, inMemorySegments.size(),
- new Path(inputContext.getUniqueIdentifier()),
- (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, spilledRecordsCounter, null, null);
-
- if (null == combiner) {
- TezMerger.writeFile(rIter, writer, nullProgressable, conf);
- } else {
- runCombineProcessor(rIter, writer);
- }
- writer.close();
- writer = null;
-
- LOG.info(inputContext.getUniqueIdentifier() +
- " Merge of the " + noInMemorySegments +
- " files in-memory complete." +
- " Local file is " + outputPath + " of size " +
- localFS.getFileStatus(outputPath).getLen());
- } catch (IOException e) {
- //make sure that we delete the ondisk file that we created
- //earlier when we invoked cloneFileAttributes
- localFS.delete(outputPath, true);
- throw e;
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
-
- // Note the output of the merge
- closeOnDiskFile(outputPath);
- }
-
- }
-
- private class OnDiskMerger extends MergeThread<Path> {
-
- public OnDiskMerger(MergeManager manager) {
- super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName("OnDiskMerger - Thread to merge on-disk map-outputs");
- setDaemon(true);
- }
-
- @Override
- public void merge(List<Path> inputs) throws IOException {
- // sanity check
- if (inputs == null || inputs.isEmpty()) {
- LOG.info("No ondisk files to merge...");
- return;
- }
-
- long approxOutputSize = 0;
- int bytesPerSum =
- conf.getInt("io.bytes.per.checksum", 512);
-
- LOG.info("OnDiskMerger: We have " + inputs.size() +
- " map outputs on disk. Triggering merge...");
-
- // 1. Prepare the list of files to be merged.
- for (Path file : inputs) {
- approxOutputSize += localFS.getFileStatus(file).getLen();
- }
-
- // add the checksum length
- approxOutputSize +=
- ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
-
- // 2. Start the on-disk merge process
- Path outputPath =
- localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
- approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
- Writer writer =
- new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null);
- TezRawKeyValueIterator iter = null;
- Path tmpDir = new Path(inputContext.getUniqueIdentifier());
- try {
- iter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, inputs.toArray(new Path[inputs.size()]),
- true, ioSortFactor, tmpDir,
- (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- nullProgressable, spilledRecordsCounter, null,
- mergedMapOutputsCounter, null);
-
- TezMerger.writeFile(iter, writer, nullProgressable, conf);
- writer.close();
- } catch (IOException e) {
- localFS.delete(outputPath, true);
- throw e;
- }
-
- closeOnDiskFile(outputPath);
-
- LOG.info(inputContext.getUniqueIdentifier() +
- " Finished merging " + inputs.size() +
- " map output files on disk of total-size " +
- approxOutputSize + "." +
- " Local output file is " + outputPath + " of size " +
- localFS.getFileStatus(outputPath).getLen());
- }
- }
-
- private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
- List<Segment> inMemorySegments,
- long leaveBytes
- ) throws IOException {
- long totalSize = 0L;
- // We could use fullSize could come from the RamManager, but files can be
- // closed but not yet present in inMemoryMapOutputs
- long fullSize = 0L;
- for (MapOutput mo : inMemoryMapOutputs) {
- fullSize += mo.getMemory().length;
- }
- while(fullSize > leaveBytes) {
- MapOutput mo = inMemoryMapOutputs.remove(0);
- byte[] data = mo.getMemory();
- long size = data.length;
- totalSize += size;
- fullSize -= size;
- IFile.Reader reader = new InMemoryReader(MergeManager.this,
- mo.getAttemptIdentifier(),
- data, 0, (int)size);
- inMemorySegments.add(new Segment(reader, true,
- (mo.isPrimaryMapOutput() ?
- mergedMapOutputsCounter : null)));
- }
- return totalSize;
- }
-
- class RawKVIteratorReader extends IFile.Reader {
-
- private final TezRawKeyValueIterator kvIter;
-
- public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
- throws IOException {
- super(null, null, size, null, spilledRecordsCounter);
- this.kvIter = kvIter;
- }
- public boolean nextRawKey(DataInputBuffer key) throws IOException {
- if (kvIter.next()) {
- final DataInputBuffer kb = kvIter.getKey();
- final int kp = kb.getPosition();
- final int klen = kb.getLength() - kp;
- key.reset(kb.getData(), kp, klen);
- bytesRead += klen;
- return true;
- }
- return false;
- }
- public void nextRawValue(DataInputBuffer value) throws IOException {
- final DataInputBuffer vb = kvIter.getValue();
- final int vp = vb.getPosition();
- final int vlen = vb.getLength() - vp;
- value.reset(vb.getData(), vp, vlen);
- bytesRead += vlen;
- }
- public long getPosition() throws IOException {
- return bytesRead;
- }
-
- public void close() throws IOException {
- kvIter.close();
- }
- }
-
- private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
- List<MapOutput> inMemoryMapOutputs,
- List<Path> onDiskMapOutputs
- ) throws IOException {
- LOG.info("finalMerge called with " +
- inMemoryMapOutputs.size() + " in-memory map-outputs and " +
- onDiskMapOutputs.size() + " on-disk map-outputs");
-
- final float maxRedPer =
- job.getFloat(
- TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT);
- if (maxRedPer > 1.0 || maxRedPer < 0.0) {
- throw new IOException(TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT +
- maxRedPer);
- }
- int maxInMemReduce = (int)Math.min(
- Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-
-
- // merge config params
- Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
- Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
- final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
- final RawComparator comparator =
- (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
-
- // segments required to vacate memory
- List<Segment> memDiskSegments = new ArrayList<Segment>();
- long inMemToDiskBytes = 0;
- boolean mergePhaseFinished = false;
- if (inMemoryMapOutputs.size() > 0) {
- int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
- inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
- memDiskSegments,
- maxInMemReduce);
- final int numMemDiskSegments = memDiskSegments.size();
- if (numMemDiskSegments > 0 &&
- ioSortFactor > onDiskMapOutputs.size()) {
-
- // If we reach here, it implies that we have less than io.sort.factor
- // disk segments and this will be incremented by 1 (result of the
- // memory segments merge). Since this total would still be
- // <= io.sort.factor, we will not do any more intermediate merges,
- // the merge of all these disk segments would be directly fed to the
- // reduce method
-
- mergePhaseFinished = true;
- // must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath =
- mapOutputFile.getInputFileForWrite(srcTaskId,
- inMemToDiskBytes).suffix(
- Constants.MERGED_OUTPUT_PREFIX);
- final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
- keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
- final Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec, null);
- try {
- TezMerger.writeFile(rIter, writer, nullProgressable, job);
- // add to list of final disk outputs.
- onDiskMapOutputs.add(outputPath);
- } catch (IOException e) {
- if (null != outputPath) {
- try {
- fs.delete(outputPath, true);
- } catch (IOException ie) {
- // NOTHING
- }
- }
- throw e;
- } finally {
- if (null != writer) {
- writer.close();
- }
- }
- LOG.info("Merged " + numMemDiskSegments + " segments, " +
- inMemToDiskBytes + " bytes to disk to satisfy " +
- "reduce memory limit");
- inMemToDiskBytes = 0;
- memDiskSegments.clear();
- } else if (inMemToDiskBytes != 0) {
- LOG.info("Keeping " + numMemDiskSegments + " segments, " +
- inMemToDiskBytes + " bytes in memory for " +
- "intermediate, on-disk merge");
- }
- }
-
- // segments on disk
- List<Segment> diskSegments = new ArrayList<Segment>();
- long onDiskBytes = inMemToDiskBytes;
- Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
- for (Path file : onDisk) {
- onDiskBytes += fs.getFileStatus(file).getLen();
- LOG.debug("Disk file: " + file + " Length is " +
- fs.getFileStatus(file).getLen());
- diskSegments.add(new Segment(job, fs, file, codec, false,
- (file.toString().endsWith(
- Constants.MERGED_OUTPUT_PREFIX) ?
- null : mergedMapOutputsCounter)
- ));
- }
- LOG.info("Merging " + onDisk.length + " files, " +
- onDiskBytes + " bytes from disk");
- Collections.sort(diskSegments, new Comparator<Segment>() {
- public int compare(Segment o1, Segment o2) {
- if (o1.getLength() == o2.getLength()) {
- return 0;
- }
- return o1.getLength() < o2.getLength() ? -1 : 1;
- }
- });
-
- // build final list of segments from merged backed by disk + in-mem
- List<Segment> finalSegments = new ArrayList<Segment>();
- long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
- finalSegments, 0);
- LOG.info("Merging " + finalSegments.size() + " segments, " +
- inMemBytes + " bytes from memory into reduce");
- if (0 != onDiskBytes) {
- final int numInMemSegments = memDiskSegments.size();
- diskSegments.addAll(0, memDiskSegments);
- memDiskSegments.clear();
- TezRawKeyValueIterator diskMerge = TezMerger.merge(
- job, fs, keyClass, valueClass, diskSegments,
- ioSortFactor, numInMemSegments, tmpDir, comparator,
- nullProgressable, false, spilledRecordsCounter, null, null);
- diskSegments.clear();
- if (0 == finalSegments.size()) {
- return diskMerge;
- }
- finalSegments.add(new Segment(
- new RawKVIteratorReader(diskMerge, onDiskBytes), true));
- }
- return TezMerger.merge(job, fs, keyClass, valueClass,
- finalSegments, finalSegments.size(), tmpDir,
- comparator, nullProgressable, spilledRecordsCounter, null,
- null);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
deleted file mode 100644
index bab882e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-abstract class MergeThread<T> extends Thread {
-
- private static final Log LOG = LogFactory.getLog(MergeThread.class);
-
- private volatile boolean inProgress = false;
- private List<T> inputs = new ArrayList<T>();
- protected final MergeManager manager;
- private final ExceptionReporter reporter;
- private boolean closed = false;
- private final int mergeFactor;
-
- public MergeThread(MergeManager manager, int mergeFactor,
- ExceptionReporter reporter) {
- this.manager = manager;
- this.mergeFactor = mergeFactor;
- this.reporter = reporter;
- }
-
- public synchronized void close() throws InterruptedException {
- closed = true;
- waitForMerge();
- interrupt();
- }
-
- public synchronized boolean isInProgress() {
- return inProgress;
- }
-
- public synchronized void startMerge(Set<T> inputs) {
- if (!closed) {
- inProgress = true;
- this.inputs = new ArrayList<T>();
- Iterator<T> iter=inputs.iterator();
- for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
- this.inputs.add(iter.next());
- iter.remove();
- }
- LOG.info(getName() + ": Starting merge with " + this.inputs.size() +
- " segments, while ignoring " + inputs.size() + " segments");
- notifyAll();
- }
- }
-
- public synchronized void waitForMerge() throws InterruptedException {
- while (inProgress) {
- wait();
- }
- }
-
- public void run() {
- while (true) {
- try {
- // Wait for notification to start the merge...
- synchronized (this) {
- while (!inProgress) {
- wait();
- }
- }
-
- // Merge
- merge(inputs);
- } catch (InterruptedException ie) {
- return;
- } catch(Throwable t) {
- reporter.reportException(t);
- return;
- } finally {
- synchronized (this) {
- // Clear inputs
- inputs = null;
- inProgress = false;
- notifyAll();
- }
- }
- }
- }
-
- public abstract void merge(List<T> inputs)
- throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
deleted file mode 100644
index 15332a1..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter {
-
- private static final Log LOG = LogFactory.getLog(Shuffle.class);
- private static final int PROGRESS_FREQUENCY = 2000;
-
- private final Configuration conf;
- private final TezInputContext inputContext;
- private final ShuffleClientMetrics metrics;
-
- private final ShuffleInputEventHandler eventHandler;
- private final ShuffleScheduler scheduler;
- private final MergeManager merger;
- private Throwable throwable = null;
- private String throwingThreadName = null;
- private final int numInputs;
- private final AtomicInteger reduceStartId;
- private final SecretKey jobTokenSecret;
- private AtomicInteger reduceRange = new AtomicInteger(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
-
- private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
-
- public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
- this.inputContext = inputContext;
- this.conf = conf;
- this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
- inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
- this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-
- this.numInputs = numInputs;
-
- this.jobTokenSecret = ShuffleUtils
- .getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
-
- Combiner combiner = TezEngineUtils.instantiateCombiner(conf, inputContext);
-
- FileSystem localFS = FileSystem.getLocal(this.conf);
- LocalDirAllocator localDirAllocator =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
- // TODO TEZ Get rid of Map / Reduce references.
- TezCounter shuffledMapsCounter =
- inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
- TezCounter reduceShuffleBytes =
- inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
- TezCounter failedShuffleCounter =
- inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
- TezCounter spilledRecordsCounter =
- inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
- TezCounter reduceCombineInputCounter =
- inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
- TezCounter mergedMapOutputsCounter =
- inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
-
- reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
- LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
- + " with default reduce range: " + reduceRange.get());
-
- scheduler = new ShuffleScheduler(
- this.inputContext,
- this.conf,
- this.numInputs,
- this,
- shuffledMapsCounter,
- reduceShuffleBytes,
- failedShuffleCounter);
- eventHandler= new ShuffleInputEventHandler(
- inputContext,
- this,
- scheduler);
- merger = new MergeManager(
- this.conf,
- localFS,
- localDirAllocator,
- inputContext,
- combiner,
- spilledRecordsCounter,
- reduceCombineInputCounter,
- mergedMapOutputsCounter,
- this);
- }
-
- public void handleEvents(List<Event> events) {
- eventHandler.handleEvents(events);
- }
-
- /**
- * Indicates whether the Shuffle and Merge processing is complete.
- * @return false if not complete, true if complete or if an error occurred.
- */
- public boolean isInputReady() {
- if (runShuffleFuture == null) {
- return false;
- }
- return runShuffleFuture.isDone();
- //return scheduler.isDone() && merger.isMergeComplete();
- }
-
- /**
- * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
- * @return an iterator over the fetched input.
- * @throws IOException
- * @throws InterruptedException
- */
- public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
- Preconditions.checkState(runShuffleFuture != null,
- "waitForInput can only be called after run");
- TezRawKeyValueIterator kvIter;
- try {
- kvIter = runShuffleFuture.get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- throw (IOException) cause;
- } else if (cause instanceof InterruptedException) {
- throw (InterruptedException) cause;
- } else {
- throw new TezUncheckedException(
- "Unexpected exception type while running Shuffle and Merge", cause);
- }
- }
- return kvIter;
- }
-
- public void run() {
- RunShuffleCallable runShuffle = new RunShuffleCallable();
- runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
- new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
- }
-
- private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
- @Override
- public TezRawKeyValueIterator call() throws IOException, InterruptedException {
- // TODO NEWTEZ Limit # fetchers to number of inputs
- final int numFetchers =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
- Fetcher[] fetchers = new Fetcher[numFetchers];
- for (int i = 0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, inputContext);
- fetchers[i].start();
- }
-
- while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
- synchronized (this) {
- if (throwable != null) {
- throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable);
- }
- }
- }
-
- // Stop the map-output fetcher threads
- for (Fetcher fetcher : fetchers) {
- fetcher.shutDown();
- }
- fetchers = null;
-
- // stop the scheduler
- scheduler.close();
-
-
- // Finish the on-going merges...
- TezRawKeyValueIterator kvIter = null;
- try {
- kvIter = merger.close();
- } catch (Throwable e) {
- throw new ShuffleError("Error while doing final merge " , e);
- }
-
- // Sanity check
- synchronized (Shuffle.this) {
- if (throwable != null) {
- throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable);
- }
- }
- return kvIter;
- }
- }
-
- public int getReduceStartId() {
- return reduceStartId.get();
- }
-
- public int getReduceRange() {
- return reduceRange.get();
- }
-
- public synchronized void reportException(Throwable t) {
- if (throwable == null) {
- throwable = t;
- throwingThreadName = Thread.currentThread().getName();
- // Notify the scheduler so that the reporting thread finds the
- // exception immediately.
- synchronized (scheduler) {
- scheduler.notifyAll();
- }
- }
- }
-
- public static class ShuffleError extends IOException {
- private static final long serialVersionUID = 5753909320586607881L;
-
- ShuffleError(String msg, Throwable t) {
- super(msg, t);
- }
- }
-
- public void setPartitionRange(int range) {
- if (range == reduceRange.get()) {
- return;
- }
- if (reduceRange.compareAndSet(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT, range)) {
- LOG.info("Reduce range set to: " + range);
- } else {
- TezUncheckedException e =
- new TezUncheckedException("Reduce range can be set only once.");
- reportException(e);
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
deleted file mode 100644
index 850dbeb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.TezEngineUtils;
-
-class ShuffleClientMetrics implements Updater {
-
- private MetricsRecord shuffleMetrics = null;
- private int numFailedFetches = 0;
- private int numSuccessFetches = 0;
- private long numBytes = 0;
- private int numThreadsBusy = 0;
- private final int numCopiers;
-
- ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf,
- String user) {
- this.numCopiers =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-
- MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
- this.shuffleMetrics =
- MetricsUtil.createRecord(metricsContext, "shuffleInput");
- this.shuffleMetrics.setTag("user", user);
- this.shuffleMetrics.setTag("dagName", dagName);
- this.shuffleMetrics.setTag("taskId", TezEngineUtils.getTaskIdentifier(vertexName, taskIndex));
- this.shuffleMetrics.setTag("sessionId",
- conf.get(
- TezJobConfig.TEZ_ENGINE_METRICS_SESSION_ID,
- TezJobConfig.DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID));
- metricsContext.registerUpdater(this);
- }
- public synchronized void inputBytes(long numBytes) {
- this.numBytes += numBytes;
- }
- public synchronized void failedFetch() {
- ++numFailedFetches;
- }
- public synchronized void successFetch() {
- ++numSuccessFetches;
- }
- public synchronized void threadBusy() {
- ++numThreadsBusy;
- }
- public synchronized void threadFree() {
- --numThreadsBusy;
- }
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
- shuffleMetrics.incrMetric("shuffle_failed_fetches",
- numFailedFetches);
- shuffleMetrics.incrMetric("shuffle_success_fetches",
- numSuccessFetches);
- if (numCopiers != 0) {
- shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
- 100*((float)numThreadsBusy/numCopiers));
- } else {
- shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
- }
- numBytes = 0;
- numSuccessFetches = 0;
- numFailedFetches = 0;
- }
- shuffleMetrics.update();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
deleted file mode 100644
index a918ef1..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * Shuffle Header information that is sent by the TaskTracker and
- * deciphered by the Fetcher thread of Reduce task
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public class ShuffleHeader implements Writable {
-
- /** Header info of the shuffle http request/response */
- public static final String HTTP_HEADER_NAME = "name";
- public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
- public static final String HTTP_HEADER_VERSION = "version";
- public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
-
- /**
- * The longest possible length of task attempt id that we will accept.
- */
- private static final int MAX_ID_LENGTH = 1000;
-
- String mapId;
- long uncompressedLength;
- long compressedLength;
- int forReduce;
-
- public ShuffleHeader() { }
-
- public ShuffleHeader(String mapId, long compressedLength,
- long uncompressedLength, int forReduce) {
- this.mapId = mapId;
- this.compressedLength = compressedLength;
- this.uncompressedLength = uncompressedLength;
- this.forReduce = forReduce;
- }
-
- public String getMapId() {
- return this.mapId;
- }
-
- public int getPartition() {
- return this.forReduce;
- }
-
- public long getUncompressedLength() {
- return uncompressedLength;
- }
-
- public long getCompressedLength() {
- return compressedLength;
- }
-
- public void readFields(DataInput in) throws IOException {
- mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH);
- compressedLength = WritableUtils.readVLong(in);
- uncompressedLength = WritableUtils.readVLong(in);
- forReduce = WritableUtils.readVInt(in);
- }
-
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, mapId);
- WritableUtils.writeVLong(out, compressedLength);
- WritableUtils.writeVLong(out, uncompressedLength);
- WritableUtils.writeVInt(out, forReduce);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
deleted file mode 100644
index a8e5fe4..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.net.URI;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.api.events.InputFailedEvent;
-import org.apache.tez.engine.api.events.InputInformationEvent;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class ShuffleInputEventHandler {
-
- private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
-
- private final ShuffleScheduler scheduler;
- private final TezInputContext inputContext;
- private final Shuffle shuffle;
-
- private int maxMapRuntime = 0;
- private boolean shuffleRangeSet = false;
-
- public ShuffleInputEventHandler(TezInputContext inputContext,
- Shuffle shuffle, ShuffleScheduler scheduler) {
- this.inputContext = inputContext;
- this.shuffle = shuffle;
- this.scheduler = scheduler;
- }
-
- public void handleEvents(List<Event> events) {
- for (Event event : events) {
- handleEvent(event);
- }
- }
-
-
- private void handleEvent(Event event) {
- if (event instanceof InputInformationEvent) {
- processInputInformationEvent((InputInformationEvent) event);
- }
- else if (event instanceof DataMovementEvent) {
- processDataMovementEvent((DataMovementEvent) event);
- } else if (event instanceof InputFailedEvent) {
- processTaskFailedEvent((InputFailedEvent) event);
- }
- }
-
- private void processInputInformationEvent(InputInformationEvent iiEvent) {
- InputInformationEventPayloadProto inputInfoPayload;
- try {
- inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
- }
- int partitionRange = inputInfoPayload.getPartitionRange();
- shuffle.setPartitionRange(partitionRange);
- this.shuffleRangeSet = true;
- }
-
- private void processDataMovementEvent(DataMovementEvent dmEvent) {
- // FIXME TODO NEWTEZ
- // Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
- DataMovementEventPayloadProto shufflePayload;
- try {
- shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
- }
- int partitionId = dmEvent.getSourceIndex();
- URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
- scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
-
- // TODO NEWTEZ See if this duration hack can be removed.
- int duration = shufflePayload.getRunDuration();
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- scheduler.informMaxMapRunTime(maxMapRuntime);
- }
- }
-
- private void processTaskFailedEvent(InputFailedEvent ifEvent) {
- InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
- scheduler.obsoleteMapOutput(taIdentifier);
- LOG.info("Obsoleting output of src-task: " + taIdentifier);
- }
-
- // TODO NEWTEZ Handle encrypted shuffle
- private URI getBaseURI(String host, int port, int partitionId) {
- StringBuilder sb = new StringBuilder("http://");
- sb.append(host);
- sb.append(":");
- sb.append(String.valueOf(port));
- sb.append("/");
-
- sb.append("mapOutput?job=");
- // Required to use the existing ShuffleHandler
- sb.append(inputContext.getApplicationId().toString().replace("application", "job"));
-
- sb.append("&reduce=");
- sb.append(partitionId);
- sb.append("&map=");
- URI u = URI.create(sb.toString());
- return u;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
deleted file mode 100644
index be75668..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ /dev/null
@@ -1,521 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.InputReadErrorEvent;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.TezEngineUtils;
-
-import com.google.common.collect.Lists;
-
-class ShuffleScheduler {
- static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
- protected Long initialValue() {
- return 0L;
- }
- };
-
- private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
- private static final int MAX_MAPS_AT_ONCE = 20;
- private static final long INITIAL_PENALTY = 10000;
- private static final float PENALTY_GROWTH_RATE = 1.3f;
-
- // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
- private final Map<Integer, MutableInt> finishedMaps;
- private final int numInputs;
- private int remainingMaps;
- private Map<InputAttemptIdentifier, MapHost> mapLocations = new HashMap<InputAttemptIdentifier, MapHost>();
- //TODO NEWTEZ Clean this and other maps at some point
- private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
- private Set<MapHost> pendingHosts = new HashSet<MapHost>();
- private Set<InputAttemptIdentifier> obsoleteMaps = new HashSet<InputAttemptIdentifier>();
-
- private final Random random = new Random(System.currentTimeMillis());
- private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
- private final Referee referee = new Referee();
- private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
- new HashMap<InputAttemptIdentifier,IntWritable>();
- private final Map<String,IntWritable> hostFailures =
- new HashMap<String,IntWritable>();
- private final TezInputContext inputContext;
- private final Shuffle shuffle;
- private final int abortFailureLimit;
- private final TezCounter shuffledMapsCounter;
- private final TezCounter reduceShuffleBytes;
- private final TezCounter failedShuffleCounter;
-
- private final long startTime;
- private long lastProgressTime;
-
- private int maxMapRuntime = 0;
- private int maxFailedUniqueFetches = 5;
- private int maxFetchFailuresBeforeReporting;
-
- private long totalBytesShuffledTillNow = 0;
- private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
-
- private boolean reportReadErrorImmediately = true;
-
- public ShuffleScheduler(TezInputContext inputContext,
- Configuration conf,
- int tasksInDegree,
- Shuffle shuffle,
- TezCounter shuffledMapsCounter,
- TezCounter reduceShuffleBytes,
- TezCounter failedShuffleCounter) {
- this.inputContext = inputContext;
- this.numInputs = tasksInDegree;
- abortFailureLimit = Math.max(30, tasksInDegree / 10);
- remainingMaps = tasksInDegree;
- //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
- finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
- this.shuffle = shuffle;
- this.shuffledMapsCounter = shuffledMapsCounter;
- this.reduceShuffleBytes = reduceShuffleBytes;
- this.failedShuffleCounter = failedShuffleCounter;
- this.startTime = System.currentTimeMillis();
- this.lastProgressTime = startTime;
- referee.start();
- this.maxFailedUniqueFetches = Math.min(tasksInDegree,
- this.maxFailedUniqueFetches);
- this.maxFetchFailuresBeforeReporting =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT);
- this.reportReadErrorImmediately =
- conf.getBoolean(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
- }
-
- public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier,
- MapHost host,
- long bytes,
- long milis,
- MapOutput output
- ) throws IOException {
- String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
- failureCounts.remove(taskIdentifier);
- hostFailures.remove(host.getHostName());
-
- if (!isFinishedTaskTrue(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
- output.commit();
- if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
- shuffledMapsCounter.increment(1);
- if (--remainingMaps == 0) {
- notifyAll();
- }
- }
-
- // update the status
- lastProgressTime = System.currentTimeMillis();
- totalBytesShuffledTillNow += bytes;
- logProgress();
- reduceShuffleBytes.increment(bytes);
- if (LOG.isDebugEnabled()) {
- LOG.debug("src task: "
- + TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
- srcAttemptIdentifier.getAttemptNumber()) + " done");
- }
- }
- // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
- }
-
- private void logProgress() {
- float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
- int mapsDone = numInputs - remainingMaps;
- long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
- float transferRate = mbs / secsSinceStart;
- LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
- + mbpsFormat.format(transferRate) + " MB/s)");
- }
-
- public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
- MapHost host,
- boolean readError) {
- host.penalize();
- int failures = 1;
- if (failureCounts.containsKey(srcAttempt)) {
- IntWritable x = failureCounts.get(srcAttempt);
- x.set(x.get() + 1);
- failures = x.get();
- } else {
- failureCounts.put(srcAttempt, new IntWritable(1));
- }
- String hostname = host.getHostName();
- if (hostFailures.containsKey(hostname)) {
- IntWritable x = hostFailures.get(hostname);
- x.set(x.get() + 1);
- } else {
- hostFailures.put(hostname, new IntWritable(1));
- }
- if (failures >= abortFailureLimit) {
- try {
- throw new IOException(failures
- + " failures downloading "
- + TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
- srcAttempt.getAttemptNumber()));
- } catch (IOException ie) {
- shuffle.reportException(ie);
- }
- }
-
- checkAndInformJobTracker(failures, srcAttempt, readError);
-
- checkReducerHealth();
-
- long delay = (long) (INITIAL_PENALTY *
- Math.pow(PENALTY_GROWTH_RATE, failures));
-
- penalties.add(new Penalty(host, delay));
-
- failedShuffleCounter.increment(1);
- }
-
- // Notify the JobTracker
- // after every read error, if 'reportReadErrorImmediately' is true or
- // after every 'maxFetchFailuresBeforeReporting' failures
- private void checkAndInformJobTracker(
- int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
- if ((reportReadErrorImmediately && readError)
- || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
- LOG.info("Reporting fetch failure for "
- + TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
- srcAttempt.getAttemptNumber()) + " to jobtracker.");
-
- List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
- failedEvents.add(new InputReadErrorEvent("Fetch failure for "
- + TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
- srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
- .getSrcTaskIndex(), srcAttempt.getAttemptNumber()));
-
- inputContext.sendEvents(failedEvents);
- //status.addFailedDependency(mapId);
- }
- }
-
- private void checkReducerHealth() {
- final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
- final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
- final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
-
- long totalFailures = failedShuffleCounter.getValue();
- int doneMaps = numInputs - remainingMaps;
-
- boolean reducerHealthy =
- (((float)totalFailures / (totalFailures + doneMaps))
- < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-
- // check if the reducer has progressed enough
- boolean reducerProgressedEnough =
- (((float)doneMaps / numInputs)
- >= MIN_REQUIRED_PROGRESS_PERCENT);
-
- // check if the reducer is stalled for a long time
- // duration for which the reducer is stalled
- int stallDuration =
- (int)(System.currentTimeMillis() - lastProgressTime);
-
- // duration for which the reducer ran with progress
- int shuffleProgressDuration =
- (int)(lastProgressTime - startTime);
-
- // min time the reducer should run without getting killed
- int minShuffleRunDuration =
- (shuffleProgressDuration > maxMapRuntime)
- ? shuffleProgressDuration
- : maxMapRuntime;
-
- boolean reducerStalled =
- (((float)stallDuration / minShuffleRunDuration)
- >= MAX_ALLOWED_STALL_TIME_PERCENT);
-
- // kill if not healthy and has insufficient progress
- if ((failureCounts.size() >= maxFailedUniqueFetches ||
- failureCounts.size() == (numInputs - doneMaps))
- && !reducerHealthy
- && (!reducerProgressedEnough || reducerStalled)) {
- LOG.fatal("Shuffle failed with too many fetch failures " +
- "and insufficient progress!");
- String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
- shuffle.reportException(new IOException(errorMsg));
- }
-
- }
-
- public synchronized void tipFailed(int srcTaskIndex) {
- if (!isFinishedTaskTrue(srcTaskIndex)) {
- setFinishedTaskTrue(srcTaskIndex);
- if (--remainingMaps == 0) {
- notifyAll();
- }
- logProgress();
- }
- }
-
- public synchronized void addKnownMapOutput(String hostName,
- int partitionId,
- String hostUrl,
- InputAttemptIdentifier srcAttempt) {
- String identifier = MapHost.createIdentifier(hostName, partitionId);
- MapHost host = mapLocations.get(identifier);
- if (host == null) {
- host = new MapHost(partitionId, hostName, hostUrl);
- assert identifier.equals(host.getIdentifier());
- mapLocations.put(srcAttempt, host);
- }
- host.addKnownMap(srcAttempt);
- pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
-
- // Mark the host as pending
- if (host.getState() == MapHost.State.PENDING) {
- pendingHosts.add(host);
- notifyAll();
- }
- }
-
- public synchronized void obsoleteMapOutput(InputAttemptIdentifier srcAttempt) {
- // The incoming srcAttempt does not contain a path component.
- obsoleteMaps.add(srcAttempt);
- }
-
- public synchronized void putBackKnownMapOutput(MapHost host,
- InputAttemptIdentifier srcAttempt) {
- host.addKnownMap(srcAttempt);
- }
-
- public synchronized MapHost getHost() throws InterruptedException {
- while(pendingHosts.isEmpty()) {
- wait();
- }
-
- MapHost host = null;
- Iterator<MapHost> iter = pendingHosts.iterator();
- int numToPick = random.nextInt(pendingHosts.size());
- for (int i=0; i <= numToPick; ++i) {
- host = iter.next();
- }
-
- pendingHosts.remove(host);
- host.markBusy();
-
- LOG.info("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
- " to " + Thread.currentThread().getName());
- shuffleStart.set(System.currentTimeMillis());
-
- return host;
- }
-
- public InputAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
- return pathToIdentifierMap.get(pathComponent);
- }
-
- public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
- List<InputAttemptIdentifier> list = host.getAndClearKnownMaps();
- Iterator<InputAttemptIdentifier> itr = list.iterator();
- List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
- int includedMaps = 0;
- int totalSize = list.size();
- // find the maps that we still need, up to the limit
- while (itr.hasNext()) {
- InputAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
- result.add(id);
- if (++includedMaps >= MAX_MAPS_AT_ONCE) {
- break;
- }
- }
- }
- // put back the maps left after the limit
- while (itr.hasNext()) {
- InputAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
- host.addKnownMap(id);
- }
- }
- LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
- host + " to " + Thread.currentThread().getName());
- return result;
- }
-
- public synchronized void freeHost(MapHost host) {
- if (host.getState() != MapHost.State.PENALIZED) {
- if (host.markAvailable() == MapHost.State.PENDING) {
- pendingHosts.add(host);
- notifyAll();
- }
- }
- LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
- (System.currentTimeMillis()-shuffleStart.get()) + "s");
- }
-
- public synchronized void resetKnownMaps() {
- mapLocations.clear();
- obsoleteMaps.clear();
- pendingHosts.clear();
- pathToIdentifierMap.clear();
- }
-
- /**
- * Utility method to check if the Shuffle data fetch is complete.
- * @return
- */
- public synchronized boolean isDone() {
- return remainingMaps == 0;
- }
-
- /**
- * Wait until the shuffle finishes or until the timeout.
- * @param millis maximum wait time
- * @return true if the shuffle is done
- * @throws InterruptedException
- */
- public synchronized boolean waitUntilDone(int millis
- ) throws InterruptedException {
- if (remainingMaps > 0) {
- wait(millis);
- return remainingMaps == 0;
- }
- return true;
- }
-
- /**
- * A structure that records the penalty for a host.
- */
- private static class Penalty implements Delayed {
- MapHost host;
- private long endTime;
-
- Penalty(MapHost host, long delay) {
- this.host = host;
- this.endTime = System.currentTimeMillis() + delay;
- }
-
- public long getDelay(TimeUnit unit) {
- long remainingTime = endTime - System.currentTimeMillis();
- return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
- }
-
- public int compareTo(Delayed o) {
- long other = ((Penalty) o).endTime;
- return endTime == other ? 0 : (endTime < other ? -1 : 1);
- }
-
- }
-
- /**
- * A thread that takes hosts off of the penalty list when the timer expires.
- */
- private class Referee extends Thread {
- public Referee() {
- setName("ShufflePenaltyReferee");
- setDaemon(true);
- }
-
- public void run() {
- try {
- while (true) {
- // take the first host that has an expired penalty
- MapHost host = penalties.take().host;
- synchronized (ShuffleScheduler.this) {
- if (host.markAvailable() == MapHost.State.PENDING) {
- pendingHosts.add(host);
- ShuffleScheduler.this.notifyAll();
- }
- }
- }
- } catch (InterruptedException ie) {
- return;
- } catch (Throwable t) {
- shuffle.reportException(t);
- }
- }
- }
-
- public void close() throws InterruptedException {
- referee.interrupt();
- referee.join();
- }
-
- public synchronized void informMaxMapRunTime(int duration) {
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- }
- }
-
- void setFinishedTaskTrue(int srcTaskIndex) {
- synchronized(finishedMaps) {
- finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
- }
- }
-
- boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
- synchronized(finishedMaps) {
- MutableInt result = finishedMaps.get(srcTaskIndex);
- if(result == null) {
- result = new MutableInt(0);
- finishedMaps.put(srcTaskIndex, result);
- }
- result.increment();
- return isFinishedTaskTrue(srcTaskIndex);
- }
- }
-
- boolean isFinishedTaskTrue(int srcTaskIndex) {
- synchronized (finishedMaps) {
- MutableInt result = finishedMaps.get(srcTaskIndex);
- if(result == null) {
- return false;
- }
- if (result.intValue() == shuffle.getReduceRange()) {
- return true;
- }
- return false;
- }
- }
-}