You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:21 UTC
[14/50] [abbrv] Rename *.new* packages back to what they should be,
remove dead code from the old packages - mapreduce module -
tez-engine module (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
new file mode 100644
index 0000000..2e10a93
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public class TezInputContextImpl extends TezTaskContextImpl
+ implements TezInputContext {
+
+ private final byte[] userPayload;
+ private final String sourceVertexName;
+ private final EventMetaData sourceInfo;
+
+ @Private
+ public TezInputContextImpl(Configuration conf, int appAttemptNumber,
+ TezUmbilical tezUmbilical, String taskVertexName,
+ String sourceVertexName, TezTaskAttemptID taskAttemptID,
+ TezCounters counters, byte[] userPayload,
+ RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
+ super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ this.userPayload = userPayload;
+ this.sourceVertexName = sourceVertexName;
+ this.sourceInfo = new EventMetaData(
+ EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
+ taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
+ }
+
+ @Override
+ public void sendEvents(List<Event> events) {
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ @Override
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ @Override
+ public String getSourceVertexName() {
+ return sourceVertexName;
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
new file mode 100644
index 0000000..ef58de2
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public class TezOutputContextImpl extends TezTaskContextImpl
+ implements TezOutputContext {
+
+ private final byte[] userPayload;
+ private final String destinationVertexName;
+ private final EventMetaData sourceInfo;
+
+ @Private
+ public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
+ TezUmbilical tezUmbilical, String taskVertexName,
+ String destinationVertexName,
+ TezTaskAttemptID taskAttemptID, TezCounters counters,
+ byte[] userPayload, RuntimeTask runtimeTask,
+ Map<String, ByteBuffer> serviceConsumerMetadata) {
+ super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ this.userPayload = userPayload;
+ this.destinationVertexName = destinationVertexName;
+ this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+ taskVertexName, destinationVertexName, taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
+ }
+
+ @Override
+ public void sendEvents(List<Event> events) {
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ @Override
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ @Override
+ public String getDestinationVertexName() {
+ return destinationVertexName;
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
new file mode 100644
index 0000000..3f20d5c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public class TezProcessorContextImpl extends TezTaskContextImpl
+ implements TezProcessorContext {
+
+ private final byte[] userPayload;
+ private final EventMetaData sourceInfo;
+
+ public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
+ TezUmbilical tezUmbilical, String vertexName,
+ TezTaskAttemptID taskAttemptID, TezCounters counters,
+ byte[] userPayload, RuntimeTask runtimeTask,
+ Map<String, ByteBuffer> serviceConsumerMetadata) {
+ super(conf, appAttemptNumber, vertexName, taskAttemptID,
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ this.userPayload = userPayload;
+ this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
+ taskVertexName, "", taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getTaskAttemptNumber());
+ }
+
+ @Override
+ public void sendEvents(List<Event> events) {
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ @Override
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ @Override
+ public void setProgress(float progress) {
+ runtimeTask.setProgress(progress);
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+
+ @Override
+ public boolean canCommit() throws IOException {
+ return tezUmbilical.canCommit(this.taskAttemptID);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
new file mode 100644
index 0000000..2312c49
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public abstract class TezTaskContextImpl implements TezTaskContext {
+
+ private final Configuration conf;
+ protected final String taskVertexName;
+ protected final TezTaskAttemptID taskAttemptID;
+ private final TezCounters counters;
+ private String[] workDirs;
+ protected String uniqueIdentifier;
+ protected final RuntimeTask runtimeTask;
+ protected final TezUmbilical tezUmbilical;
+ private final Map<String, ByteBuffer> serviceConsumerMetadata;
+ private final int appAttemptNumber;
+
+ @Private
+ public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
+ String taskVertexName, TezTaskAttemptID taskAttemptID,
+ TezCounters counters, RuntimeTask runtimeTask,
+ TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
+ this.conf = conf;
+ this.taskVertexName = taskVertexName;
+ this.taskAttemptID = taskAttemptID;
+ this.counters = counters;
+ // TODO Maybe change this to be task id specific at some point. For now
+ // Shuffle code relies on this being a path specified by YARN
+ this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+ this.runtimeTask = runtimeTask;
+ this.tezUmbilical = tezUmbilical;
+ this.serviceConsumerMetadata = serviceConsumerMetadata;
+ // TODO NEWTEZ at some point dag attempt should not map to app attempt
+ this.appAttemptNumber = appAttemptNumber;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return taskAttemptID.getTaskID().getVertexID().getDAGId()
+ .getApplicationId();
+ }
+
+ @Override
+ public int getTaskIndex() {
+ return taskAttemptID.getTaskID().getId();
+ }
+
+ @Override
+ public int getDAGAttemptNumber() {
+ return appAttemptNumber;
+ }
+
+ @Override
+ public int getTaskAttemptNumber() {
+ return taskAttemptID.getId();
+ }
+
+ @Override
+ public String getDAGName() {
+ // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+ // the unique identifier.
+ return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+ }
+
+ @Override
+ public String getTaskVertexName() {
+ return taskVertexName;
+ }
+
+
+ @Override
+ public TezCounters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public String[] getWorkDirs() {
+ return Arrays.copyOf(workDirs, workDirs.length);
+ }
+
+ @Override
+ public String getUniqueIdentifier() {
+ return uniqueIdentifier;
+ }
+
+ @Override
+ public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+ return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
+ .asReadOnlyBuffer().rewind();
+ }
+
+ @Override
+ public ByteBuffer getServiceProviderMetaData(String serviceName) {
+ return AuxiliaryServiceHelper.getServiceDataFromEnv(
+ serviceName, System.getenv());
+ }
+
+ protected void signalFatalError(Throwable t, String message,
+ EventMetaData sourceInfo) {
+ runtimeTask.setFatalError(t, message);
+ String diagnostics;
+ if (t != null && message != null) {
+ diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
+ + ", errorMessage=" + message;
+ } else if (t == null && message == null) {
+ diagnostics = "Unknown error";
+ } else {
+ diagnostics = t != null ?
+ "exceptionThrown=" + StringUtils.stringifyException(t)
+ : " errorMessage=" + message;
+ }
+ tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
new file mode 100644
index 0000000..925d87b
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public interface TezUmbilical {
+
+ public void addEvents(Collection<TezEvent> events);
+
+ public void signalFatalError(TezTaskAttemptID taskAttemptID,
+ String diagnostics,
+ EventMetaData sourceInfo);
+
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
index 78d2e0c..927f0ad 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
@@ -26,7 +26,7 @@ import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.engine.newapi.TezInputContext;
import org.apache.tez.engine.shuffle.common.DiskFetchedInput;
import org.apache.tez.engine.shuffle.common.FetchedInput;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
index b36c240..0b86a8e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -30,10 +30,10 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.engine.api.KVReader;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.shuffle.impl.InMemoryReader;
import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.newapi.KVReader;
import org.apache.tez.engine.shuffle.common.FetchedInput;
import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
index 9f3dbbe..84ddd28 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
@@ -29,13 +29,13 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.api.KVWriter;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.TezEngineUtils;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
-import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.newapi.TezOutputContext;
public class FileBasedKVWriter implements KVWriter {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index 3920ce6..ab78d82 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -30,8 +30,8 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.api.Partitioner;
import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.TezTaskContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
deleted file mode 100644
index bf504bb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.combine;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class CombineInput implements Input {
-
- private final TezRawKeyValueIterator input;
- private TezCounter inputValueCounter;
- private TezCounter inputKeyCounter;
- private RawComparator<Object> comparator;
- private Object key; // current key
- private Object value; // current value
- private boolean firstValue = false; // first value in key
- private boolean nextKeyIsSame = false; // more w/ this key
- private boolean hasMore; // more in file
- protected Progressable reporter;
- private Deserializer keyDeserializer;
- private Deserializer valueDeserializer;
- private DataInputBuffer buffer = new DataInputBuffer();
- private BytesWritable currentRawKey = new BytesWritable();
- private ValueIterable iterable = new ValueIterable();
-
- public CombineInput(TezRawKeyValueIterator kvIter) {
- this.input = kvIter;
- }
-
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- }
-
- public boolean hasNext() throws IOException, InterruptedException {
- while (hasMore && nextKeyIsSame) {
- nextKeyValue();
- }
- if (hasMore) {
- if (inputKeyCounter != null) {
- inputKeyCounter.increment(1);
- }
- return nextKeyValue();
- } else {
- return false;
- }
- }
-
- private boolean nextKeyValue() throws IOException, InterruptedException {
- if (!hasMore) {
- key = null;
- value = null;
- return false;
- }
- firstValue = !nextKeyIsSame;
- DataInputBuffer nextKey = input.getKey();
- currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
- nextKey.getLength() - nextKey.getPosition());
- buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
- key = keyDeserializer.deserialize(key);
- DataInputBuffer nextVal = input.getValue();
- buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
- value = valueDeserializer.deserialize(value);
-
- hasMore = input.next();
- if (hasMore) {
- nextKey = input.getKey();
- nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
- currentRawKey.getLength(),
- nextKey.getData(),
- nextKey.getPosition(),
- nextKey.getLength() - nextKey.getPosition()
- ) == 0;
- } else {
- nextKeyIsSame = false;
- }
- inputValueCounter.increment(1);
- return true;
- }
-
- public Object getNextKey() throws IOException, InterruptedException {
- return key;
- }
-
- public Iterable getNextValues() throws IOException,
- InterruptedException {
- return iterable;
- }
-
- public float getProgress() throws IOException, InterruptedException {
- return input.getProgress().getProgress();
- }
-
- public void close() throws IOException {
- input.close();
- }
-
- public TezRawKeyValueIterator getIterator() {
- return this.input;
- }
-
- protected class ValueIterator implements Iterator<Object> {
-
-
- public boolean hasNext() {
- return firstValue || nextKeyIsSame;
- }
-
- public Object next() {
-
- // if this is the first record, we don't need to advance
- if (firstValue) {
- firstValue = false;
- return value;
- }
- // if this isn't the first record and the next key is different, they
- // can't advance it here.
- if (!nextKeyIsSame) {
- throw new NoSuchElementException("iterate past last value");
- }
- // otherwise, go to the next key/value pair
- try {
- nextKeyValue();
- return value;
- } catch (IOException ie) {
- throw new RuntimeException("next value iterator failed", ie);
- } catch (InterruptedException ie) {
- // this is bad, but we can't modify the exception list of java.util
- throw new RuntimeException("next value iterator interrupted", ie);
- }
- }
-
- public void remove() {
- throw new UnsupportedOperationException("remove not implemented");
- }
- }
-
-
-
- protected class ValueIterable implements Iterable<Object> {
- private ValueIterator iterator = new ValueIterator();
- public Iterator<Object> iterator() {
- return iterator;
- }
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
deleted file mode 100644
index 10a1b90..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.combine;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.records.OutputContext;
-
-public class CombineOutput implements Output {
-
- private final Writer writer;
-
- public CombineOutput(Writer writer) {
- this.writer = writer;
- }
-
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- // TODO Auto-generated method stub
-
- }
-
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- writer.append(key, value);
- }
-
- @Override
- public OutputContext getOutputContext() {
- return null;
- }
-
- public void close() throws IOException, InterruptedException {
- writer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
index 38b04d3..1cb89a7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
@@ -35,8 +35,8 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.newoutput.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.newapi.TezInputContext;
@SuppressWarnings({"rawtypes"})
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index 46851c7..b2a0b54 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
class MapOutput {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index ad9bb5f..bf2be4e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -54,7 +54,7 @@ import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.engine.hadoop.compat.NullProgressable;
import org.apache.tez.engine.newapi.TezInputContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
deleted file mode 100644
index 35d7723..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort;
-
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.engine.api.Output;
-
-/**
- * {@link SortingOutput} is an {@link Output} which sorts incoming key/value
- * pairs.
- */
-public interface SortingOutput extends Output {
-
- // TODO PreCommit rename
- public void setTask(RunningTaskContext runningTaskContext);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 1b5e015..8b4bd4e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.common.TezEngineUtils;
import org.apache.tez.engine.common.combine.Combiner;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.hadoop.compat.NullProgressable;
import org.apache.tez.engine.newapi.TezOutputContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
deleted file mode 100644
index ae6a371..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.task.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/** Iterator to return Combined values */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class CombineValuesIterator<KEY,VALUE>
-extends ValuesIterator<KEY,VALUE> {
-
- private final TezCounter combineInputCounter;
-
- public CombineValuesIterator(TezRawKeyValueIterator in,
- RawComparator<KEY> comparator, Class<KEY> keyClass,
- Class<VALUE> valClass, Configuration conf, TezTaskReporter reporter,
- TezCounter combineInputCounter) throws IOException {
- super(in, comparator, keyClass, valClass, conf, reporter);
- this.combineInputCounter = combineInputCounter;
- }
-
- public VALUE next() {
- combineInputCounter.increment(1);
- return super.next();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
deleted file mode 100644
index bbe4e34..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.newoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from the Child running the Task.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezLocalTaskOutputFiles extends TezTaskOutput {
-
- public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
- super(conf, uniqueId);
- }
-
- private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-
- /**
- * Return the path to local map output file created earlier
- *
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputFile()
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
- }
-
- /**
- * Create a local map output file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputFileForWrite(long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
- }
-
- /**
- * Create a local map output file name. This should *only* be used if the size
- * of the file is not known. Otherwise use the equivalent which accepts a size
- * parameter.
- *
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputFileForWrite() throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR
- + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING,
- conf);
- }
-
- /**
- * Create a local map output file name on the same volume.
- */
- @Override
- public Path getOutputFileForWriteInVolume(Path existing) {
- return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- }
-
- /**
- * Return the path to a local map output index file created earlier
- *
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputIndexFile()
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
- conf);
- }
-
- /**
- * Create a local map output index file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getOutputIndexFileForWrite(long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
- size, conf);
- }
-
- /**
- * Create a local map output index file name on the same volume.
- */
- @Override
- public Path getOutputIndexFileForWriteInVolume(Path existing) {
- return new Path(existing.getParent(),
- Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- }
-
- /**
- * Return a local map spill file created earlier.
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillFile(int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out", conf);
- }
-
- /**
- * Create a local map spill file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out", size, conf);
- }
-
- /**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillIndexFile(int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out.index", conf);
- }
-
- /**
- * Create a local map spill index file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out.index", size, conf);
- }
-
- /**
- * Return a local reduce input file created earlier
- *
- * @param mapId a map task id
- * @return path
- * @throws IOException
- */
- @Override
- public Path getInputFile(InputAttemptIdentifier mapId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(String.format(
- Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
- Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
- }
-
- /**
- * Create a local reduce input file name.
- *
- * @param mapId a map task id
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- @Override
- public Path getInputFileForWrite(int taskId,
- long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
- size, conf);
- }
-
- /** Removes all of the files related to a task. */
- @Override
- public void removeAll()
- throws IOException {
- deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
- }
-
- private String[] getLocalDirs() throws IOException {
- return conf.getStrings(TezJobConfig.LOCAL_DIRS);
- }
-
- @SuppressWarnings("deprecation")
- private void deleteLocalFiles(String subdir) throws IOException {
- String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
deleted file mode 100644
index 87a5aec..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.newoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class TezTaskOutput {
-
- protected Configuration conf;
- protected String uniqueId;
-
- public TezTaskOutput(Configuration conf, String uniqueId) {
- this.conf = conf;
- this.uniqueId = uniqueId;
- }
-
- /**
- * Return the path to local map output file created earlier
- *
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputFile() throws IOException;
-
- /**
- * Create a local map output file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputFileForWrite(long size) throws IOException;
-
- /**
- * Create a local output file name. This method is meant to be used *only* if
- * the size of the file is not know up front.
- *
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputFileForWrite() throws IOException;
-
- /**
- * Create a local map output file name on the same volume.
- */
- public abstract Path getOutputFileForWriteInVolume(Path existing);
-
- /**
- * Return the path to a local map output index file created earlier
- *
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputIndexFile() throws IOException;
-
- /**
- * Create a local map output index file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
-
- /**
- * Create a local map output index file name on the same volume.
- */
- public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
-
- /**
- * Return a local map spill file created earlier.
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillFile(int spillNumber) throws IOException;
-
- /**
- * Create a local map spill file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException;
-
- /**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
-
- /**
- * Create a local map spill index file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException;
-
- /**
- * Return a local reduce input file created earlier
- *
- * @param attemptIdentifier The identifier for the source task
- * @return path
- * @throws IOException
- */
- public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
-
- /**
- * Create a local reduce input file name.
- *
- * @param taskIdentifier The identifier for the source task
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public abstract Path getInputFileForWrite(
- int taskIdentifier, long size) throws IOException;
-
- /** Removes all of the files related to a task. */
- public abstract void removeAll() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
deleted file mode 100644
index a37f05f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.newoutput;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezTaskOutputFiles extends TezTaskOutput {
-
- public TezTaskOutputFiles(Configuration conf, String uniqueId) {
- super(conf, uniqueId);
- }
-
- private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
-
- private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
- private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
- + ".index";
-
-
-
- // assume configured to $localdir/usercache/$user/appcache/$appId
- private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-
- private Path getAttemptOutputDir() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getAttemptOutputDir: "
- + Constants.TASK_OUTPUT_DIR + "/"
- + uniqueId);
- }
- return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
- }
-
- /**
- * Return the path to local map output file created earlier
- *
- * @return path
- * @throws IOException
- */
- public Path getOutputFile() throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
- }
-
- /**
- * Create a local map output file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getOutputFileForWrite(long size) throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
- }
-
- /**
- * Create a local map output file name. This should *only* be used if the size
- * of the file is not known. Otherwise use the equivalent which accepts a size
- * parameter.
- *
- * @return path
- * @throws IOException
- */
- public Path getOutputFileForWrite() throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
- }
-
- /**
- * Create a local map output file name on the same volume.
- */
- public Path getOutputFileForWriteInVolume(Path existing) {
- Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
- Path attemptOutputDir = new Path(outputDir, uniqueId);
- return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
- }
-
- /**
- * Return the path to a local map output index file created earlier
- *
- * @return path
- * @throws IOException
- */
- public Path getOutputIndexFile() throws IOException {
- Path attemptIndexOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
- }
-
- /**
- * Create a local map output index file name.
- *
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getOutputIndexFileForWrite(long size) throws IOException {
- Path attemptIndexOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
- size, conf);
- }
-
- /**
- * Create a local map output index file name on the same volume.
- */
- public Path getOutputIndexFileForWriteInVolume(Path existing) {
- Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
- Path attemptOutputDir = new Path(outputDir, uniqueId);
- return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- }
-
- /**
- * Return a local map spill file created earlier.
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public Path getSpillFile(int spillNumber) throws IOException {
- return lDirAlloc.getLocalPathToRead(
- String.format(SPILL_FILE_PATTERN,
- uniqueId, spillNumber), conf);
- }
-
- /**
- * Create a local map spill file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(
- String.format(String.format(SPILL_FILE_PATTERN,
- uniqueId, spillNumber)), size, conf);
- }
-
- /**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path
- * @throws IOException
- */
- public Path getSpillIndexFile(int spillNumber) throws IOException {
- return lDirAlloc.getLocalPathToRead(
- String.format(SPILL_INDEX_FILE_PATTERN,
- uniqueId, spillNumber), conf);
- }
-
- /**
- * Create a local map spill index file name.
- *
- * @param spillNumber the number
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(
- String.format(SPILL_INDEX_FILE_PATTERN,
- uniqueId, spillNumber), size, conf);
- }
-
- /**
- * Return a local reduce input file created earlier
- *
- * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
- * @return path
- * @throws IOException
- */
- public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
- throw new UnsupportedOperationException("Incompatible with LocalRunner");
- }
-
- /**
- * Create a local reduce input file name.
- *
- * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
- * @param size the size of the file
- * @return path
- * @throws IOException
- */
- public Path getInputFileForWrite(int srcTaskId,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- uniqueId, getAttemptOutputDir().toString(), srcTaskId),
- size, conf);
- }
-
- /** Removes all of the files related to a task. */
- public void removeAll() throws IOException {
- throw new UnsupportedOperationException("Incompatible with LocalRunner");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
index 69484af..40e6b1a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
/**
* Manipulate the working area for the transient store for maps and reduces.
@@ -41,11 +41,13 @@ import org.apache.tez.dag.records.TezTaskID;
@InterfaceStability.Unstable
public class TezLocalTaskOutputFiles extends TezTaskOutput {
+ public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
+ super(conf, uniqueId);
+ }
+
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
- public TezLocalTaskOutputFiles() {
- }
/**
* Return the path to local map output file created earlier
@@ -57,7 +59,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getOutputFile()
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, getConf());
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
}
/**
@@ -71,7 +73,22 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getOutputFileForWrite(long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
- + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, getConf());
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
+ }
+
+ /**
+ * Create a local map output file name. This should *only* be used if the size
+ * of the file is not known. Otherwise use the equivalent which accepts a size
+ * parameter.
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFileForWrite() throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR
+ + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING,
+ conf);
}
/**
@@ -93,7 +110,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
- getConf());
+ conf);
}
/**
@@ -108,7 +125,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
- size, getConf());
+ size, conf);
}
/**
@@ -131,7 +148,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getSpillFile(int spillNumber)
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out", getConf());
+ + spillNumber + ".out", conf);
}
/**
@@ -146,7 +163,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out", size, getConf());
+ + spillNumber + ".out", size, conf);
}
/**
@@ -160,7 +177,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getSpillIndexFile(int spillNumber)
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out.index", getConf());
+ + spillNumber + ".out.index", conf);
}
/**
@@ -175,7 +192,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
- + spillNumber + ".out.index", size, getConf());
+ + spillNumber + ".out.index", size, conf);
}
/**
@@ -186,11 +203,11 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
* @throws IOException
*/
@Override
- public Path getInputFile(int mapId)
+ public Path getInputFile(InputAttemptIdentifier mapId)
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
- Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId)), getConf());
+ Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
}
/**
@@ -202,12 +219,12 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
* @throws IOException
*/
@Override
- public Path getInputFileForWrite(TezTaskID mapId,
+ public Path getInputFileForWrite(int taskId,
long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
- Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, mapId.getId()),
- size, getConf());
+ Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
+ size, conf);
}
/** Removes all of the files related to a task. */
@@ -217,20 +234,15 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
}
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- }
-
private String[] getLocalDirs() throws IOException {
- return getConf().getStrings(TezJobConfig.LOCAL_DIRS);
+ return conf.getStrings(TezJobConfig.LOCAL_DIRS);
}
@SuppressWarnings("deprecation")
private void deleteLocalFiles(String subdir) throws IOException {
String[] localDirs = getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(getConf()).delete(new Path(localDirs[i], subdir));
+ FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
index 50d270b..e1d83ad 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
@@ -23,9 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
/**
* Manipulate the working area for the transient store for maps and reduces.
@@ -38,12 +37,14 @@ import org.apache.tez.dag.records.TezTaskID;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public abstract class TezTaskOutput implements Configurable {
+public abstract class TezTaskOutput {
protected Configuration conf;
protected String uniqueId;
- public TezTaskOutput() {
+ public TezTaskOutput(Configuration conf, String uniqueId) {
+ this.conf = conf;
+ this.uniqueId = uniqueId;
}
/**
@@ -64,6 +65,15 @@ public abstract class TezTaskOutput implements Configurable {
public abstract Path getOutputFileForWrite(long size) throws IOException;
/**
+ * Create a local output file name. This method is meant to be used *only* if
+ * the size of the file is not know up front.
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFileForWrite() throws IOException;
+
+ /**
* Create a local map output file name on the same volume.
*/
public abstract Path getOutputFileForWriteInVolume(Path existing);
@@ -133,42 +143,23 @@ public abstract class TezTaskOutput implements Configurable {
/**
* Return a local reduce input file created earlier
*
- * @param mapId a map task id
+ * @param attemptIdentifier The identifier for the source task
* @return path
* @throws IOException
*/
- public abstract Path getInputFile(int mapId) throws IOException;
+ public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
/**
* Create a local reduce input file name.
*
- * @param mapId a map task id
+ * @param taskIdentifier The identifier for the source task
* @param size the size of the file
* @return path
* @throws IOException
*/
public abstract Path getInputFileForWrite(
- TezTaskID mapId, long size) throws IOException;
+ int taskIdentifier, long size) throws IOException;
/** Removes all of the files related to a task. */
public abstract void removeAll() throws IOException;
-
- public void setUniqueIdentifier(String uniqueId) {
- this.uniqueId = uniqueId;
- }
-
- public String getUniqueIdentifier() {
- return this.uniqueId;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index b7874f0..b8f051b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
/**
* Manipulate the working area for the transient store for maps and reduces.
@@ -40,32 +40,35 @@ import org.apache.tez.dag.records.TezTaskID;
* taskTracker/jobCache/jobId/attemptId
* This class should not be used from TaskTracker space.
*/
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TezTaskOutputFiles extends TezTaskOutput {
+
+ public TezTaskOutputFiles(Configuration conf, String uniqueId) {
+ super(conf, uniqueId);
+ }
private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
- private Configuration conf;
private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+ ".index";
- public TezTaskOutputFiles() {
- }
+
// assume configured to $localdir/usercache/$user/appcache/$appId
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
private Path getAttemptOutputDir() {
if (LOG.isDebugEnabled()) {
LOG.debug("getAttemptOutputDir: "
+ Constants.TASK_OUTPUT_DIR + "/"
- + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+ + uniqueId);
}
- return new Path(Constants.TASK_OUTPUT_DIR,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+ return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
}
/**
@@ -94,12 +97,25 @@ public class TezTaskOutputFiles extends TezTaskOutput {
}
/**
+ * Create a local map output file name. This should *only* be used if the size
+ * of the file is not known. Otherwise use the equivalent which accepts a size
+ * parameter.
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
+ }
+
+ /**
* Create a local map output file name on the same volume.
*/
public Path getOutputFileForWriteInVolume(Path existing) {
Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
- Path attemptOutputDir = new Path(outputDir,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+ Path attemptOutputDir = new Path(outputDir, uniqueId);
return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
}
@@ -136,8 +152,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
*/
public Path getOutputIndexFileForWriteInVolume(Path existing) {
Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
- Path attemptOutputDir = new Path(outputDir,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+ Path attemptOutputDir = new Path(outputDir, uniqueId);
return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
}
@@ -152,7 +167,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public Path getSpillFile(int spillNumber) throws IOException {
return lDirAlloc.getLocalPathToRead(
String.format(SPILL_FILE_PATTERN,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+ uniqueId, spillNumber), conf);
}
/**
@@ -167,7 +182,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
throws IOException {
return lDirAlloc.getLocalPathForWrite(
String.format(String.format(SPILL_FILE_PATTERN,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber)), size, conf);
+ uniqueId, spillNumber)), size, conf);
}
/**
@@ -180,7 +195,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public Path getSpillIndexFile(int spillNumber) throws IOException {
return lDirAlloc.getLocalPathToRead(
String.format(SPILL_INDEX_FILE_PATTERN,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+ uniqueId, spillNumber), conf);
}
/**
@@ -195,33 +210,32 @@ public class TezTaskOutputFiles extends TezTaskOutput {
throws IOException {
return lDirAlloc.getLocalPathForWrite(
String.format(SPILL_INDEX_FILE_PATTERN,
- conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), size, conf);
+ uniqueId, spillNumber), size, conf);
}
/**
* Return a local reduce input file created earlier
*
- * @param mapId a map task id
+ * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
* @return path
* @throws IOException
*/
- public Path getInputFile(int mapId) throws IOException {
+ public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
throw new UnsupportedOperationException("Incompatible with LocalRunner");
}
/**
* Create a local reduce input file name.
*
- * @param mapId a map task id
+ * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
* @param size the size of the file
* @return path
* @throws IOException
*/
- public Path getInputFileForWrite(TezTaskID mapId,
+ public Path getInputFileForWrite(int srcTaskId,
long size) throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
- Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
- getAttemptOutputDir().toString(), mapId.getId()),
+ uniqueId, getAttemptOutputDir().toString(), srcTaskId),
size, conf);
}
@@ -229,13 +243,4 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public void removeAll() throws IOException {
throw new UnsupportedOperationException("Incompatible with LocalRunner");
}
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index eccd119..c719fba 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -29,12 +29,12 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.KVReader;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.ValuesIterator;
import org.apache.tez.engine.common.shuffle.impl.Shuffle;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
import org.apache.tez.engine.newapi.LogicalInput;
import org.apache.tez.engine.newapi.TezInputContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
deleted file mode 100644
index 269fe81..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldinput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer.
- */
-public class LocalMergedInput extends OldShuffledMergedInput {
-
- public LocalMergedInput(TezEngineTaskContext task, int index) {
- super(task, index);
- }
-
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- }
-
- public boolean hasNext() throws IOException, InterruptedException {
- return false;
- }
-
- public Object getNextKey() throws IOException, InterruptedException {
- return null;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Iterable getNextValues()
- throws IOException, InterruptedException {
- return null;
- }
-
- public float getProgress() throws IOException, InterruptedException {
- return 0f;
- }
-
- public void close() throws IOException {
- }
-
- public TezRawKeyValueIterator getIterator() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
deleted file mode 100644
index c046a27..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldinput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- * {@link OldShuffledMergedInput} in an {@link Input} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer.
- */
-public class OldShuffledMergedInput implements Input {
-
-
- public OldShuffledMergedInput(TezEngineTaskContext task, int index) {
- }
-
- public void mergeWith(OldShuffledMergedInput other) {
- }
-
- public void setTask(RunningTaskContext runningTaskContext) {
- }
-
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- }
-
- public boolean hasNext() throws IOException, InterruptedException {
- return false;
- }
-
- public Object getNextKey() throws IOException, InterruptedException {
- return null;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Iterable getNextValues()
- throws IOException, InterruptedException {
- return null;
- }
-
- public float getProgress() throws IOException, InterruptedException {
- return 0f;
- }
-
- public void close() throws IOException {
- }
-
- public TezRawKeyValueIterator getIterator() {
- return null;
- }
-
-}