You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:45 UTC
[38/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
deleted file mode 100644
index a984b0f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.input;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.ValuesIterator;
-import org.apache.tez.engine.common.shuffle.impl.Shuffle;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
- * intermediate sorted data, merges them and provides key/<values> to the
- * consumer.
- *
- * The Copy and Merge will be triggered by the initialization - which is handled
- * by the Tez framework. Input is not consumable until the Copy and Merge are
- * complete. Methods are provided to check for this, as well as to wait for
- * completion. Attempting to get a reader on a non-complete input will block.
- *
- */
-public class ShuffledMergedInput implements LogicalInput {
-
- static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
-
- protected TezInputContext inputContext;
- protected TezRawKeyValueIterator rawIter = null;
- protected Configuration conf;
- protected int numInputs = 0;
- protected Shuffle shuffle;
- @SuppressWarnings("rawtypes")
- protected ValuesIterator vIter;
-
- private TezCounter inputKeyCounter;
- private TezCounter inputValueCounter;
-
- @Override
- public List<Event> initialize(TezInputContext inputContext) throws IOException {
- this.inputContext = inputContext;
- this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-
- this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
- this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
- this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
- inputContext.getWorkDirs());
-
- // Start the shuffle - copy and merge.
- shuffle = new Shuffle(inputContext, this.conf, numInputs);
- shuffle.run();
-
- return Collections.emptyList();
- }
-
- /**
- * Check if the input is ready for consumption
- *
- * @return true if the input is ready for consumption, or if an error occurred
- * processing fetching the input. false if the shuffle and merge are
- * still in progress
- */
- public boolean isInputReady() {
- return shuffle.isInputReady();
- }
-
- /**
- * Waits for the input to become ready for consumption
- * @throws IOException
- * @throws InterruptedException
- */
- public void waitForInputReady() throws IOException, InterruptedException {
- rawIter = shuffle.waitForInput();
- createValuesIterator();
- }
-
- @Override
- public List<Event> close() throws IOException {
- rawIter.close();
- return Collections.emptyList();
- }
-
- /**
- * Get a KVReader for the Input.</p> This method will block until the input is
- * ready - i.e. the copy and merge stages are complete. Users can use the
- * isInputReady method to check if the input is ready, which gives an
- * indication of whether this method will block or not.
- *
- * NOTE: All values for the current K-V pair must be read prior to invoking
- * moveToNext. Once moveToNext() is called, the valueIterator from the
- * previous K-V pair will throw an Exception
- *
- * @return a KVReader over the sorted input.
- */
- @Override
- public KVReader getReader() throws IOException {
- if (rawIter == null) {
- try {
- waitForInputReady();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting for input ready", e);
- }
- }
- return new KVReader() {
-
- @Override
- public boolean next() throws IOException {
- return vIter.moveToNext();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public KVRecord getCurrentKV() {
- return new KVRecord(vIter.getKey(), vIter.getValues());
- }
- };
- }
-
- @Override
- public void handleEvents(List<Event> inputEvents) {
- shuffle.handleEvents(inputEvents);
- }
-
- @Override
- public void setNumPhysicalInputs(int numInputs) {
- this.numInputs = numInputs;
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- protected void createValuesIterator()
- throws IOException {
- vIter = new ValuesIterator(rawIter,
- (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
- ConfigUtils.getIntermediateInputKeyClass(conf),
- ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
-
- }
-
- // This functionality is currently broken. If there's inputs which need to be
- // written to disk, there's a possibility that inputs from the different
- // sources could clobber each others' output. Also the current structures do
- // not have adequate information to de-dupe these (vertex name)
-// public void mergeWith(ShuffledMergedInput other) {
-// this.numInputs += other.getNumPhysicalInputs();
-// }
-//
-// public int getNumPhysicalInputs() {
-// return this.numInputs;
-// }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
deleted file mode 100644
index f2da031..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
- * intermediate sorted data, merges them and provides key/<values> to the
- * consumer.
- *
- * The Copy and Merge will be triggered by the initialization - which is handled
- * by the Tez framework. Input is not consumable until the Copy and Merge are
- * complete. Methods are provided to check for this, as well as to wait for
- * completion. Attempting to get a reader on a non-complete input will block.
- *
- */
-
-package org.apache.tez.engine.lib.input;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-@LimitedPrivate("mapreduce")
-public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
-
- @Private
- public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
- // wait for input so that iterator is available
- waitForInputReady();
- return rawIter;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java
deleted file mode 100644
index 44238fd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.input;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.Reader;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.broadcast.input.BroadcastShuffleManager;
-
-import com.google.common.base.Preconditions;
-
-public class ShuffledUnorderedKVInput implements LogicalInput {
-
- private Configuration conf;
- private int numInputs = -1;
- private BroadcastShuffleManager shuffleManager;
-
-
-
- public ShuffledUnorderedKVInput() {
- }
-
- @Override
- public List<Event> initialize(TezInputContext inputContext) throws Exception {
- Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
- this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
- this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
-
- this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
- return null;
- }
-
- @Override
- public Reader getReader() throws Exception {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void handleEvents(List<Event> inputEvents) {
- shuffleManager.handleEvents(inputEvents);
- }
-
- @Override
- public List<Event> close() throws Exception {
- this.shuffleManager.shutdown();
- return null;
- }
-
- @Override
- public void setNumPhysicalInputs(int numInputs) {
- this.numInputs = numInputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
deleted file mode 100644
index 26a01c8..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.output;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.Writer;
-import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
-
-/**
- * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs
- * written to it and persists it to a file.
- */
-public class InMemorySortedOutput implements LogicalOutput {
-
- protected InMemoryShuffleSorter sorter;
- protected int numTasks;
- protected TezOutputContext outputContext;
-
-
- @Override
- public List<Event> initialize(TezOutputContext outputContext)
- throws IOException {
- this.outputContext = outputContext;
- this.sorter = new InMemoryShuffleSorter();
- sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
- return Collections.emptyList();
- }
-
- @Override
- public Writer getWriter() throws IOException {
- return new KVWriter() {
-
- @Override
- public void write(Object key, Object value) throws IOException {
- sorter.write(key, value);
- }
- };
- }
-
- @Override
- public void handleEvents(List<Event> outputEvents) {
- // No events expected.
- }
-
- @Override
- public void setNumPhysicalOutputs(int numOutputs) {
- this.numTasks = numOutputs;
- }
-
- @Override
- public List<Event> close() throws IOException {
- sorter.flush();
- sorter.close();
- // TODO NEWTEZ Event generation
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
deleted file mode 100644
index 7fd26d7..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.output;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-
-public class LocalOnFileSorterOutput extends OnFileSortedOutput {
-
- private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
-
-
-
- @Override
- public List<Event> close() throws IOException {
- LOG.debug("Closing LocalOnFileSorterOutput");
- super.close();
-
- TezTaskOutput mapOutputFile = sorter.getMapOutput();
- FileSystem localFs = FileSystem.getLocal(conf);
-
- Path src = mapOutputFile.getOutputFile();
- Path dst =
- mapOutputFile.getInputFileForWrite(
- outputContext.getTaskIndex(),
- localFs.getFileStatus(src).getLen());
-
- LOG.info("Renaming src = " + src + ", dst = " + dst);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming src = " + src + ", dst = " + dst);
- }
- localFs.rename(src, dst);
- return null;
- }
-
- @Override
- protected List<Event> generateDataMovementEventsOnClose() throws IOException {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
deleted file mode 100644
index 9c9eba0..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.output;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.engine.common.sort.impl.ExternalSorter;
-import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.collect.Lists;
-
-/**
- * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs
- * written to it and persists it to a file.
- */
-public class OnFileSortedOutput implements LogicalOutput {
-
- protected ExternalSorter sorter;
- protected Configuration conf;
- protected int numOutputs;
- protected TezOutputContext outputContext;
- private long startTime;
- private long endTime;
-
-
- @Override
- public List<Event> initialize(TezOutputContext outputContext)
- throws IOException {
- this.startTime = System.nanoTime();
- this.outputContext = outputContext;
- sorter = new DefaultSorter();
- this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
- // Initializing this parametr in this conf since it is used in multiple
- // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
- // TezMerger, etc.
- this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
- sorter.initialize(outputContext, conf, numOutputs);
- return Collections.emptyList();
- }
-
- @Override
- public KVWriter getWriter() throws IOException {
- return new KVWriter() {
- @Override
- public void write(Object key, Object value) throws IOException {
- sorter.write(key, value);
- }
- };
- }
-
- @Override
- public void handleEvents(List<Event> outputEvents) {
- // Not expecting any events.
- }
-
- @Override
- public void setNumPhysicalOutputs(int numOutputs) {
- this.numOutputs = numOutputs;
- }
-
- @Override
- public List<Event> close() throws IOException {
- sorter.flush();
- sorter.close();
- this.endTime = System.nanoTime();
-
- return generateDataMovementEventsOnClose();
- }
-
- protected List<Event> generateDataMovementEventsOnClose() throws IOException {
- String host = System.getenv(ApplicationConstants.Environment.NM_HOST
- .toString());
- ByteBuffer shuffleMetadata = outputContext
- .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
- int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
-
- DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
- .newBuilder();
- payloadBuilder.setHost(host);
- payloadBuilder.setPort(shufflePort);
- payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
- payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
- DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
- byte[] payloadBytes = payloadProto.toByteArray();
-
- List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
-
- for (int i = 0; i < numOutputs; i++) {
- DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
- events.add(event);
- }
- return events;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
deleted file mode 100644
index 3ff603f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.output;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.broadcast.output.FileBasedKVWriter;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class OnFileUnorderedKVOutput implements LogicalOutput {
-
- private TezOutputContext outputContext;
- private FileBasedKVWriter kvWriter;
-
- public OnFileUnorderedKVOutput() {
- }
-
- @Override
- public List<Event> initialize(TezOutputContext outputContext)
- throws Exception {
- this.outputContext = outputContext;
- this.kvWriter = new FileBasedKVWriter(outputContext);
- return Collections.emptyList();
- }
-
- @Override
- public KVWriter getWriter() throws Exception {
- return kvWriter;
- }
-
- @Override
- public void handleEvents(List<Event> outputEvents) {
- throw new TezUncheckedException("Not expecting any events");
- }
-
- @Override
- public List<Event> close() throws Exception {
- boolean outputGenerated = this.kvWriter.close();
- DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
- .newBuilder();
-
- String host = System.getenv(ApplicationConstants.Environment.NM_HOST
- .toString());
- ByteBuffer shuffleMetadata = outputContext
- .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
- int shufflePort = ShuffleUtils
- .deserializeShuffleProviderMetaData(shuffleMetadata);
- payloadBuilder.setOutputGenerated(outputGenerated);
- if (outputGenerated) {
- payloadBuilder.setHost(host);
- payloadBuilder.setPort(shufflePort);
- payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
- }
- DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
-
- DataMovementEvent dmEvent = new DataMovementEvent(0,
- payloadProto.toByteArray());
- List<Event> events = Lists.newArrayListWithCapacity(1);
- events.add(dmEvent);
- return events;
- }
-
- @Override
- public void setNumPhysicalOutputs(int numOutputs) {
- Preconditions.checkArgument(numOutputs == 1,
- "Number of outputs can only be 1 for " + this.getClass().getName());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
deleted file mode 100644
index 29063f9..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newruntime;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.LogicalIOProcessor;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.api.impl.EventMetaData;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.api.impl.TezInputContextImpl;
-import org.apache.tez.engine.api.impl.TezOutputContextImpl;
-import org.apache.tez.engine.api.impl.TezProcessorContextImpl;
-import org.apache.tez.engine.api.impl.TezUmbilical;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-@Private
-public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
-
- private static final Log LOG = LogFactory
- .getLog(LogicalIOProcessorRuntimeTask.class);
-
- private final List<InputSpec> inputSpecs;
- private final List<LogicalInput> inputs;
-
- private final List<OutputSpec> outputSpecs;
- private final List<LogicalOutput> outputs;
-
- private List<TezInputContext> inputContexts;
- private List<TezOutputContext> outputContexts;
- private TezProcessorContext processorContext;
-
- private final ProcessorDescriptor processorDescriptor;
- private final LogicalIOProcessor processor;
-
- private final Map<String, ByteBuffer> serviceConsumerMetadata;
-
- private Map<String, LogicalInput> inputMap;
- private Map<String, LogicalOutput> outputMap;
-
- private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
- private Thread eventRouterThread = null;
-
- private final int appAttemptNumber;
-
- public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
- Configuration tezConf, TezUmbilical tezUmbilical,
- Token<JobTokenIdentifier> jobToken) throws IOException {
- // TODO Remove jobToken from here post TEZ-421
- super(taskSpec, tezConf, tezUmbilical);
- LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
- + taskSpec);
- this.inputContexts = new ArrayList<TezInputContext>(taskSpec.getInputs().size());
- this.outputContexts = new ArrayList<TezOutputContext>(taskSpec.getOutputs().size());
- this.inputSpecs = taskSpec.getInputs();
- this.inputs = createInputs(inputSpecs);
- this.outputSpecs = taskSpec.getOutputs();
- this.outputs = createOutputs(outputSpecs);
- this.processorDescriptor = taskSpec.getProcessorDescriptor();
- this.processor = createProcessor(processorDescriptor);
- this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
- this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
- ShuffleUtils.convertJobTokenToBytes(jobToken));
- this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
- this.state = State.NEW;
- this.appAttemptNumber = appAttemptNumber;
- }
-
- public void initialize() throws Exception {
- LOG.info("Initializing LogicalProcessorIORuntimeTask");
- Preconditions.checkState(this.state == State.NEW, "Already initialized");
- this.state = State.INITED;
- inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
- outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
-
- // TODO Maybe close initialized inputs / outputs in case of failure to
- // initialize.
- // Initialize all inputs. TODO: Multi-threaded at some point.
- for (int i = 0; i < inputs.size(); i++) {
- String srcVertexName = inputSpecs.get(i).getSourceVertexName();
- initializeInput(inputs.get(i),
- inputSpecs.get(i));
- inputMap.put(srcVertexName, inputs.get(i));
- }
-
- // Initialize all outputs. TODO: Multi-threaded at some point.
- for (int i = 0; i < outputs.size(); i++) {
- String destVertexName = outputSpecs.get(i).getDestinationVertexName();
- initializeOutput(outputs.get(i), outputSpecs.get(i));
- outputMap.put(destVertexName, outputs.get(i));
- }
-
- // Initialize processor.
- initializeLogicalIOProcessor();
- startRouterThread();
- }
-
- public void run() throws Exception {
- synchronized (this.state) {
- Preconditions.checkState(this.state == State.INITED,
- "Can only run while in INITED state. Current: " + this.state);
- this.state = State.RUNNING;
- }
- LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
- lioProcessor.run(inputMap, outputMap);
- }
-
- public void close() throws Exception {
- try {
- Preconditions.checkState(this.state == State.RUNNING,
- "Can only run while in RUNNING state. Current: " + this.state);
- this.state = State.CLOSED;
-
- // Close the Inputs.
- for (int i = 0; i < inputs.size(); i++) {
- String srcVertexName = inputSpecs.get(i).getSourceVertexName();
- List<Event> closeInputEvents = inputs.get(i).close();
- sendTaskGeneratedEvents(closeInputEvents,
- EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
- srcVertexName, taskSpec.getTaskAttemptID());
- }
-
- // Close the Processor.
- processor.close();
-
- // Close the Outputs.
- for (int i = 0; i < outputs.size(); i++) {
- String destVertexName = outputSpecs.get(i).getDestinationVertexName();
- List<Event> closeOutputEvents = outputs.get(i).close();
- sendTaskGeneratedEvents(closeOutputEvents,
- EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
- destVertexName, taskSpec.getTaskAttemptID());
- }
- } finally {
- setTaskDone();
- if (eventRouterThread != null) {
- eventRouterThread.interrupt();
- }
- }
- }
-
- private void initializeInput(Input input, InputSpec inputSpec)
- throws Exception {
- TezInputContext tezInputContext = createInputContext(inputSpec);
- inputContexts.add(tezInputContext);
- if (input instanceof LogicalInput) {
- ((LogicalInput) input).setNumPhysicalInputs(inputSpec
- .getPhysicalEdgeCount());
- }
- LOG.info("Initializing Input using InputSpec: " + inputSpec);
- List<Event> events = input.initialize(tezInputContext);
- sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
- tezInputContext.getTaskVertexName(),
- tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
- }
-
- private void initializeOutput(Output output, OutputSpec outputSpec)
- throws Exception {
- TezOutputContext tezOutputContext = createOutputContext(outputSpec);
- outputContexts.add(tezOutputContext);
- if (output instanceof LogicalOutput) {
- ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
- .getPhysicalEdgeCount());
- }
- LOG.info("Initializing Output using OutputSpec: " + outputSpec);
- List<Event> events = output.initialize(tezOutputContext);
- sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
- tezOutputContext.getTaskVertexName(),
- tezOutputContext.getDestinationVertexName(),
- taskSpec.getTaskAttemptID());
- }
-
- private void initializeLogicalIOProcessor() throws Exception {
- LOG.info("Initializing processor"
- + ", processorClassName=" + processorDescriptor.getClassName());
- TezProcessorContext processorContext = createProcessorContext();
- this.processorContext = processorContext;
- processor.initialize(processorContext);
- }
-
- private TezInputContext createInputContext(InputSpec inputSpec) {
- TezInputContext inputContext = new TezInputContextImpl(tezConf,
- appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
- inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
- tezCounters,
- inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
- .getProcessorDescriptor().getUserPayload() : inputSpec
- .getInputDescriptor().getUserPayload(), this,
- serviceConsumerMetadata);
- return inputContext;
- }
-
- private TezOutputContext createOutputContext(OutputSpec outputSpec) {
- TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
- appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
- outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
- tezCounters,
- outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
- .getProcessorDescriptor().getUserPayload() : outputSpec
- .getOutputDescriptor().getUserPayload(), this,
- serviceConsumerMetadata);
- return outputContext;
- }
-
- private TezProcessorContext createProcessorContext() {
- TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
- appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
- tezCounters, processorDescriptor.getUserPayload(), this,
- serviceConsumerMetadata);
- return processorContext;
- }
-
- private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
- List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
- for (InputSpec inputSpec : inputSpecs) {
- LOG.info("Creating Input from InputSpec: "
- + inputSpec);
- Input input = RuntimeUtils.createClazzInstance(inputSpec
- .getInputDescriptor().getClassName());
-
- if (input instanceof LogicalInput) {
- inputs.add((LogicalInput) input);
- } else {
- throw new TezUncheckedException(input.getClass().getName()
- + " is not a sub-type of LogicalInput."
- + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
- }
- }
- return inputs;
- }
-
- private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
- List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
- outputSpecs.size());
- for (OutputSpec outputSpec : outputSpecs) {
- LOG.info("Creating Output from OutputSpec"
- + outputSpec);
- Output output = RuntimeUtils.createClazzInstance(outputSpec
- .getOutputDescriptor().getClassName());
- if (output instanceof LogicalOutput) {
- outputs.add((LogicalOutput) output);
- } else {
- throw new TezUncheckedException(output.getClass().getName()
- + " is not a sub-type of LogicalOutput."
- + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
- }
- }
- return outputs;
- }
-
- private LogicalIOProcessor createProcessor(
- ProcessorDescriptor processorDescriptor) {
- Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
- .getClassName());
- if (!(processor instanceof LogicalIOProcessor)) {
- throw new TezUncheckedException(processor.getClass().getName()
- + " is not a sub-type of LogicalIOProcessor."
- + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
- }
- return (LogicalIOProcessor) processor;
- }
-
- private void sendTaskGeneratedEvents(List<Event> events,
- EventProducerConsumerType generator, String taskVertexName,
- String edgeVertexName, TezTaskAttemptID taskAttemptID) {
- if (events == null || events.isEmpty()) {
- return;
- }
- EventMetaData eventMetaData = new EventMetaData(generator,
- taskVertexName, edgeVertexName, taskAttemptID);
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent te = new TezEvent(e, eventMetaData);
- tezEvents.add(te);
- }
- if (LOG.isDebugEnabled()) {
- for (TezEvent e : tezEvents) {
- LOG.debug("Generated event info"
- + ", eventMetaData=" + eventMetaData.toString()
- + ", eventType=" + e.getEventType());
- }
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- private boolean handleEvent(TezEvent e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Handling TezEvent in task"
- + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
- + ", eventType=" + e.getEventType()
- + ", eventSourceInfo=" + e.getSourceInfo()
- + ", eventDestinationInfo=" + e.getDestinationInfo());
- }
- try {
- switch (e.getDestinationInfo().getEventGenerator()) {
- case INPUT:
- LogicalInput input = inputMap.get(
- e.getDestinationInfo().getEdgeVertexName());
- if (input != null) {
- input.handleEvents(Collections.singletonList(e.getEvent()));
- } else {
- throw new TezUncheckedException("Unhandled event for invalid target: "
- + e);
- }
- break;
- case OUTPUT:
- LogicalOutput output = outputMap.get(
- e.getDestinationInfo().getEdgeVertexName());
- if (output != null) {
- output.handleEvents(Collections.singletonList(e.getEvent()));
- } else {
- throw new TezUncheckedException("Unhandled event for invalid target: "
- + e);
- }
- break;
- case PROCESSOR:
- processor.handleEvents(Collections.singletonList(e.getEvent()));
- break;
- case SYSTEM:
- LOG.warn("Trying to send a System event in a Task: " + e);
- break;
- }
- } catch (Throwable t) {
- LOG.warn("Failed to handle event", t);
- setFatalError(t, "Failed to handle event");
- EventMetaData sourceInfo = new EventMetaData(
- e.getDestinationInfo().getEventGenerator(),
- taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
- getTaskAttemptID());
- tezUmbilical.signalFatalError(getTaskAttemptID(),
- StringUtils.stringifyException(t), sourceInfo);
- return false;
- }
- return true;
- }
-
- @Override
- public synchronized void handleEvents(Collection<TezEvent> events) {
- if (events == null || events.isEmpty()) {
- return;
- }
- eventCounter.addAndGet(events.size());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received events to be processed by task"
- + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
- + ", eventCount=" + events.size()
- + ", newEventCounter=" + eventCounter.get());
- }
- eventsToBeProcessed.addAll(events);
- }
-
- private void startRouterThread() {
- eventRouterThread = new Thread(new Runnable() {
- public void run() {
- while (!isTaskDone() && !Thread.currentThread().isInterrupted()) {
- try {
- TezEvent e = eventsToBeProcessed.take();
- if (e == null) {
- continue;
- }
- // TODO TODONEWTEZ
- if (!handleEvent(e)) {
- LOG.warn("Stopping Event Router thread as failed to handle"
- + " event: " + e);
- return;
- }
- } catch (InterruptedException e) {
- if (!isTaskDone()) {
- LOG.warn("Event Router thread interrupted. Returning.");
- }
- return;
- }
- }
- }
- });
-
- eventRouterThread.setName("TezTaskEventRouter["
- + taskSpec.getTaskAttemptID().toString() + "]");
- eventRouterThread.start();
- }
-
- public synchronized void cleanup() {
- setTaskDone();
- if (eventRouterThread != null) {
- eventRouterThread.interrupt();
- }
- }
-
- @Private
- @VisibleForTesting
- public List<TezInputContext> getInputContexts() {
- return this.inputContexts;
- }
-
- @Private
- @VisibleForTesting
- public List<TezOutputContext> getOutputContexts() {
- return this.outputContexts;
- }
-
- @Private
- @VisibleForTesting
- public TezProcessorContext getProcessorContext() {
- return this.processorContext;
- }
-
- @Private
- @VisibleForTesting
- public Map<String, LogicalInput> getInputs() {
- return this.inputMap;
- }
-
- @Private
- @VisibleForTesting
- public Map<String, LogicalOutput> getOutputs() {
- return this.outputMap;
- }
-
- @Private
- @VisibleForTesting
- public LogicalIOProcessor getProcessor() {
- return this.processor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
deleted file mode 100644
index 22cbc7c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newruntime;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.api.impl.TezUmbilical;
-
-public abstract class RuntimeTask {
-
- protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
- protected Throwable fatalError = null;
- protected String fatalErrorMessage = null;
- protected float progress;
- protected final TezCounters tezCounters;
- protected final TaskSpec taskSpec;
- protected final Configuration tezConf;
- protected final TezUmbilical tezUmbilical;
- protected final AtomicInteger eventCounter;
- private final AtomicBoolean taskDone;
-
- protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
- TezUmbilical tezUmbilical) {
- this.taskSpec = taskSpec;
- this.tezConf = tezConf;
- this.tezUmbilical = tezUmbilical;
- this.tezCounters = new TezCounters();
- this.eventCounter = new AtomicInteger(0);
- this.progress = 0.0f;
- this.taskDone = new AtomicBoolean(false);
- }
-
- protected enum State {
- NEW, INITED, RUNNING, CLOSED;
- }
-
- protected State state;
-
- public String getVertexName() {
- return taskSpec.getVertexName();
- }
-
- public void setFatalError(Throwable t, String message) {
- hasFatalError.set(true);
- this.fatalError = t;
- this.fatalErrorMessage = message;
- }
-
- public boolean hadFatalError() {
- return hasFatalError.get();
- }
-
- public synchronized void setProgress(float progress) {
- this.progress = progress;
- }
-
- public synchronized float getProgress() {
- return this.progress;
- }
-
- public TezCounters getCounters() {
- return this.tezCounters;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskSpec.getTaskAttemptID();
- }
-
- public abstract void handleEvents(Collection<TezEvent> events);
-
- public int getEventCounter() {
- return eventCounter.get();
- }
-
- public boolean isTaskDone() {
- return taskDone.get();
- }
-
- protected void setTaskDone() {
- taskDone.set(true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
deleted file mode 100644
index 20a029e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newruntime;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.tez.dag.api.TezUncheckedException;
-
-public class RuntimeUtils {
-
- private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
-
- private static Class<?> getClazz(String className) {
- Class<?> clazz = CLAZZ_CACHE.get(className);
- if (clazz == null) {
- try {
- clazz = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new TezUncheckedException("Unable to load class: " + className, e);
- }
- }
- return clazz;
- }
-
- private static <T> T getNewInstance(Class<T> clazz) {
- T instance;
- try {
- instance = clazz.newInstance();
- } catch (InstantiationException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
- }
- return instance;
- }
-
- public static <T> T createClazzInstance(String className) {
- Class<?> clazz = getClazz(className);
- @SuppressWarnings("unchecked")
- T instance = (T) getNewInstance(clazz);
- return instance;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
deleted file mode 100644
index 531e460..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-
-import com.google.common.base.Preconditions;
-
-public class DiskFetchedInput extends FetchedInput {
-
- private static final Log LOG = LogFactory.getLog(DiskFetchedInput.class);
-
- private final FileSystem localFS;
- private final Path tmpOutputPath;
- private final Path outputPath;
-
- public DiskFetchedInput(long size,
- InputAttemptIdentifier inputAttemptIdentifier,
- FetchedInputCallback callbackHandler, Configuration conf,
- LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
- throws IOException {
- super(Type.DISK, size, inputAttemptIdentifier, callbackHandler);
-
- this.localFS = FileSystem.getLocal(conf);
- this.outputPath = filenameAllocator.getInputFileForWrite(
- this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
- this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return localFS.create(tmpOutputPath);
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return localFS.open(outputPath);
- }
-
- @Override
- public void commit() throws IOException {
- if (state == State.PENDING) {
- state = State.COMMITTED;
- localFS.rename(tmpOutputPath, outputPath);
- notifyFetchComplete();
- }
- }
-
- @Override
- public void abort() throws IOException {
- if (state == State.PENDING) {
- state = State.ABORTED;
- // TODO NEWTEZ Maybe defer this to container cleanup
- localFS.delete(tmpOutputPath, false);
- notifyFetchFailure();
- }
- }
-
- @Override
- public void free() {
- Preconditions.checkState(
- state == State.COMMITTED || state == State.ABORTED,
- "FetchedInput can only be freed after it is committed or aborted");
- if (state == State.COMMITTED) {
- state = State.FREED;
- try {
- // TODO NEWTEZ Maybe defer this to container cleanup
- localFS.delete(outputPath, false);
- } catch (IOException e) {
- // Ignoring the exception, will eventually be cleaned by container
- // cleanup.
- LOG.warn("Failed to remvoe file : " + outputPath.toString());
- }
- notifyFreedResource();
- }
- }
-
- @Override
- public String toString() {
- return "DiskFetchedInput [outputPath=" + outputPath
- + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size="
- + size + ", type=" + type + ", id=" + id + ", state=" + state + "]";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java
deleted file mode 100644
index fb0b324..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-public class FetchResult {
-
- private final String host;
- private final int port;
- private final int partition;
- private final Iterable<InputAttemptIdentifier> pendingInputs;
-
- public FetchResult(String host, int port, int partition,
- Iterable<InputAttemptIdentifier> pendingInputs) {
- this.host = host;
- this.port = port;
- this.partition = partition;
- this.pendingInputs = pendingInputs;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public Iterable<InputAttemptIdentifier> getPendingInputs() {
- return pendingInputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java
deleted file mode 100644
index f5339d3..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-@Private
-public abstract class FetchedInput {
-
- public static enum Type {
- WAIT, // TODO NEWTEZ Implement this, only if required.
- MEMORY,
- DISK,
- }
-
- protected static enum State {
- PENDING, COMMITTED, ABORTED, FREED
- }
-
- private static AtomicInteger ID_GEN = new AtomicInteger(0);
-
- protected InputAttemptIdentifier inputAttemptIdentifier;
- protected final long size;
- protected final Type type;
- protected final FetchedInputCallback callback;
- protected final int id;
- protected State state;
-
- public FetchedInput(Type type, long size,
- InputAttemptIdentifier inputAttemptIdentifier,
- FetchedInputCallback callbackHandler) {
- this.type = type;
- this.size = size;
- this.inputAttemptIdentifier = inputAttemptIdentifier;
- this.callback = callbackHandler;
- this.id = ID_GEN.getAndIncrement();
- this.state = State.PENDING;
- }
-
- public Type getType() {
- return this.type;
- }
-
- public long getSize() {
- return this.size;
- }
-
- public InputAttemptIdentifier getInputAttemptIdentifier() {
- return this.inputAttemptIdentifier;
- }
-
- /**
- * Inform the Allocator about a committed resource.
- * This should be called by commit
- */
- public void notifyFetchComplete() {
- this.callback.fetchComplete(this);
- }
-
- /**
- * Inform the Allocator about a failed resource.
- * This should be called by abort
- */
- public void notifyFetchFailure() {
- this.callback.fetchFailed(this);
- }
-
- /**
- * Inform the Allocator about a completed resource being released.
- * This should be called by free
- */
- public void notifyFreedResource() {
- this.callback.freeResources(this);
- }
-
- /**
- * Returns the output stream to be used to write fetched data. Users are
- * expected to close the OutputStream when they're done
- */
- public abstract OutputStream getOutputStream() throws IOException;
-
- /**
- * Return an input stream to be used to read the previously fetched data.
- * Users are expected to close the InputStream when they're done
- */
- public abstract InputStream getInputStream() throws IOException;
-
- /**
- * Commit the output. Should be idempotent
- */
- public abstract void commit() throws IOException;
-
- /**
- * Abort the output. Should be idempotent
- */
- public abstract void abort() throws IOException;
-
- /**
- * Called when this input has been consumed, so that resources can be
- * reclaimed.
- */
- public abstract void free();
-
- @Override
- public int hashCode() {
- return id;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- FetchedInput other = (FetchedInput) obj;
- if (id != other.id)
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java
deleted file mode 100644
index 7e573f0..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-public interface FetchedInputAllocator {
-
- public FetchedInput allocate(long size,
- InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java
deleted file mode 100644
index 2d2d73b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-public interface FetchedInputCallback {
-
- public void fetchComplete(FetchedInput fetchedInput);
-
- public void fetchFailed(FetchedInput fetchedInput);
-
- public void freeResources(FetchedInput fetchedInput);
-
-}