You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:26 UTC
[17/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
deleted file mode 100644
index dc1a447..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
-public class TezHeartbeatRequest implements Writable {
-
- private String containerIdentifier;
- private List<TezEvent> events;
- private TezTaskAttemptID currentTaskAttemptID;
- private int startIndex;
- private int maxEvents;
- private long requestId;
-
- public TezHeartbeatRequest() {
- }
-
- public TezHeartbeatRequest(long requestId, List<TezEvent> events,
- String containerIdentifier, TezTaskAttemptID taskAttemptID,
- int startIndex, int maxEvents) {
- this.containerIdentifier = containerIdentifier;
- this.requestId = requestId;
- this.events = Collections.unmodifiableList(events);
- this.startIndex = startIndex;
- this.maxEvents = maxEvents;
- this.currentTaskAttemptID = taskAttemptID;
- }
-
- public String getContainerIdentifier() {
- return containerIdentifier;
- }
-
- public List<TezEvent> getEvents() {
- return events;
- }
-
- public int getStartIndex() {
- return startIndex;
- }
-
- public int getMaxEvents() {
- return maxEvents;
- }
-
- public long getRequestId() {
- return requestId;
- }
-
- public TezTaskAttemptID getCurrentTaskAttemptID() {
- return currentTaskAttemptID;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- if (events != null) {
- out.writeBoolean(true);
- out.writeInt(events.size());
- for (TezEvent e : events) {
- e.write(out);
- }
- } else {
- out.writeBoolean(false);
- }
- if (currentTaskAttemptID != null) {
- out.writeBoolean(true);
- currentTaskAttemptID.write(out);
- } else {
- out.writeBoolean(false);
- }
- out.writeInt(startIndex);
- out.writeInt(maxEvents);
- out.writeLong(requestId);
- Text.writeString(out, containerIdentifier);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- if (in.readBoolean()) {
- int eventsCount = in.readInt();
- events = new ArrayList<TezEvent>(eventsCount);
- for (int i = 0; i < eventsCount; ++i) {
- TezEvent e = new TezEvent();
- e.readFields(in);
- events.add(e);
- }
- }
- if (in.readBoolean()) {
- currentTaskAttemptID = new TezTaskAttemptID();
- currentTaskAttemptID.readFields(in);
- } else {
- currentTaskAttemptID = null;
- }
- startIndex = in.readInt();
- maxEvents = in.readInt();
- requestId = in.readLong();
- containerIdentifier = Text.readString(in);
- }
-
- @Override
- public String toString() {
- return "{ "
- + " containerId=" + containerIdentifier
- + ", requestId=" + requestId
- + ", startIndex=" + startIndex
- + ", maxEventsToGet=" + maxEvents
- + ", taskAttemptId" + currentTaskAttemptID
- + ", eventCount=" + (events != null ? events.size() : 0)
- + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
deleted file mode 100644
index 22ae7eb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-public class TezHeartbeatResponse implements Writable {
-
- private long lastRequestId;
- private boolean shouldDie = false;
- private List<TezEvent> events;
-
- public TezHeartbeatResponse() {
- }
-
- public TezHeartbeatResponse(List<TezEvent> events) {
- this.events = Collections.unmodifiableList(events);
- }
-
- public List<TezEvent> getEvents() {
- return events;
- }
-
- public boolean shouldDie() {
- return shouldDie;
- }
-
- public long getLastRequestId() {
- return lastRequestId;
- }
-
- public void setEvents(List<TezEvent> events) {
- this.events = Collections.unmodifiableList(events);
- }
-
- public void setLastRequestId(long lastRequestId ) {
- this.lastRequestId = lastRequestId;
- }
-
- public void setShouldDie() {
- this.shouldDie = true;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(lastRequestId);
- out.writeBoolean(shouldDie);
- if(events != null) {
- out.writeBoolean(true);
- out.writeInt(events.size());
- for (TezEvent e : events) {
- e.write(out);
- }
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- lastRequestId = in.readLong();
- shouldDie = in.readBoolean();
- if(in.readBoolean()) {
- int eventCount = in.readInt();
- events = new ArrayList<TezEvent>(eventCount);
- for (int i = 0; i < eventCount; ++i) {
- TezEvent e = new TezEvent();
- e.readFields(in);
- events.add(e);
- }
- }
- }
-
- @Override
- public String toString() {
- return "{ "
- + " lastRequestId=" + lastRequestId
- + ", shouldDie=" + shouldDie
- + ", eventCount=" + (events != null ? events.size() : 0)
- + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
deleted file mode 100644
index 245cd3b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezInputContextImpl extends TezTaskContextImpl
- implements TezInputContext {
-
- private final byte[] userPayload;
- private final String sourceVertexName;
- private final EventMetaData sourceInfo;
-
- @Private
- public TezInputContextImpl(Configuration conf, int appAttemptNumber,
- TezUmbilical tezUmbilical, String taskVertexName,
- String sourceVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters, byte[] userPayload,
- RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
- super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
- this.userPayload = userPayload;
- this.sourceVertexName = sourceVertexName;
- this.sourceInfo = new EventMetaData(
- EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
- taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
- .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
- }
-
- @Override
- public void sendEvents(List<Event> events) {
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent tEvt = new TezEvent(e, sourceInfo);
- tezEvents.add(tEvt);
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- @Override
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- @Override
- public String getSourceVertexName() {
- return sourceVertexName;
- }
-
- @Override
- public void fatalError(Throwable exception, String message) {
- super.signalFatalError(exception, message, sourceInfo);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
deleted file mode 100644
index 6b42e13..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezOutputContextImpl extends TezTaskContextImpl
- implements TezOutputContext {
-
- private final byte[] userPayload;
- private final String destinationVertexName;
- private final EventMetaData sourceInfo;
-
- @Private
- public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
- TezUmbilical tezUmbilical, String taskVertexName,
- String destinationVertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload, RuntimeTask runtimeTask,
- Map<String, ByteBuffer> serviceConsumerMetadata) {
- super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
- this.userPayload = userPayload;
- this.destinationVertexName = destinationVertexName;
- this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
- taskVertexName, destinationVertexName, taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
- .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
- }
-
- @Override
- public void sendEvents(List<Event> events) {
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent tEvt = new TezEvent(e, sourceInfo);
- tezEvents.add(tEvt);
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- @Override
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- @Override
- public String getDestinationVertexName() {
- return destinationVertexName;
- }
-
- @Override
- public void fatalError(Throwable exception, String message) {
- super.signalFatalError(exception, message, sourceInfo);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
deleted file mode 100644
index 7ffcfd6..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezProcessorContextImpl extends TezTaskContextImpl
- implements TezProcessorContext {
-
- private final byte[] userPayload;
- private final EventMetaData sourceInfo;
-
- public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
- TezUmbilical tezUmbilical, String vertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload, RuntimeTask runtimeTask,
- Map<String, ByteBuffer> serviceConsumerMetadata) {
- super(conf, appAttemptNumber, vertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
- this.userPayload = userPayload;
- this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
- taskVertexName, "", taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
- .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getTaskAttemptNumber());
- }
-
- @Override
- public void sendEvents(List<Event> events) {
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent tEvt = new TezEvent(e, sourceInfo);
- tezEvents.add(tEvt);
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- @Override
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- @Override
- public void setProgress(float progress) {
- runtimeTask.setProgress(progress);
- }
-
- @Override
- public void fatalError(Throwable exception, String message) {
- super.signalFatalError(exception, message, sourceInfo);
- }
-
- @Override
- public boolean canCommit() throws IOException {
- return tezUmbilical.canCommit(this.taskAttemptID);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
deleted file mode 100644
index d5a4037..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.TezTaskContext;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public abstract class TezTaskContextImpl implements TezTaskContext {
-
- private final Configuration conf;
- protected final String taskVertexName;
- protected final TezTaskAttemptID taskAttemptID;
- private final TezCounters counters;
- private String[] workDirs;
- protected String uniqueIdentifier;
- protected final RuntimeTask runtimeTask;
- protected final TezUmbilical tezUmbilical;
- private final Map<String, ByteBuffer> serviceConsumerMetadata;
- private final int appAttemptNumber;
-
- @Private
- public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
- String taskVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters, RuntimeTask runtimeTask,
- TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
- this.conf = conf;
- this.taskVertexName = taskVertexName;
- this.taskAttemptID = taskAttemptID;
- this.counters = counters;
- // TODO Maybe change this to be task id specific at some point. For now
- // Shuffle code relies on this being a path specified by YARN
- this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
- this.runtimeTask = runtimeTask;
- this.tezUmbilical = tezUmbilical;
- this.serviceConsumerMetadata = serviceConsumerMetadata;
- // TODO NEWTEZ at some point dag attempt should not map to app attempt
- this.appAttemptNumber = appAttemptNumber;
- }
-
- @Override
- public ApplicationId getApplicationId() {
- return taskAttemptID.getTaskID().getVertexID().getDAGId()
- .getApplicationId();
- }
-
- @Override
- public int getTaskIndex() {
- return taskAttemptID.getTaskID().getId();
- }
-
- @Override
- public int getDAGAttemptNumber() {
- return appAttemptNumber;
- }
-
- @Override
- public int getTaskAttemptNumber() {
- return taskAttemptID.getId();
- }
-
- @Override
- public String getDAGName() {
- // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
- // the unique identifier.
- return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
- }
-
- @Override
- public String getTaskVertexName() {
- return taskVertexName;
- }
-
-
- @Override
- public TezCounters getCounters() {
- return counters;
- }
-
- @Override
- public String[] getWorkDirs() {
- return Arrays.copyOf(workDirs, workDirs.length);
- }
-
- @Override
- public String getUniqueIdentifier() {
- return uniqueIdentifier;
- }
-
- @Override
- public ByteBuffer getServiceConsumerMetaData(String serviceName) {
- return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
- .asReadOnlyBuffer().rewind();
- }
-
- @Override
- public ByteBuffer getServiceProviderMetaData(String serviceName) {
- return AuxiliaryServiceHelper.getServiceDataFromEnv(
- serviceName, System.getenv());
- }
-
- protected void signalFatalError(Throwable t, String message,
- EventMetaData sourceInfo) {
- runtimeTask.setFatalError(t, message);
- String diagnostics;
- if (t != null && message != null) {
- diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
- + ", errorMessage=" + message;
- } else if (t == null && message == null) {
- diagnostics = "Unknown error";
- } else {
- diagnostics = t != null ?
- "exceptionThrown=" + StringUtils.stringifyException(t)
- : " errorMessage=" + message;
- }
- tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
deleted file mode 100644
index 925d87b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public interface TezUmbilical {
-
- public void addEvents(Collection<TezEvent> events);
-
- public void signalFatalError(TezTaskAttemptID taskAttemptID,
- String diagnostics,
- EventMetaData sourceInfo);
-
- public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
deleted file mode 100644
index 1211598..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.engine.shuffle.common.DiskFetchedInput;
-import org.apache.tez.engine.shuffle.common.FetchedInput;
-import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.engine.shuffle.common.FetchedInputCallback;
-import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
-
-public class BroadcastInputManager implements FetchedInputAllocator,
- FetchedInputCallback {
-
- private final Configuration conf;
-
- private final TezTaskOutputFiles fileNameAllocator;
- private final LocalDirAllocator localDirAllocator;
-
- // Configuration parameters
- private final long memoryLimit;
- private final long maxSingleShuffleLimit;
-
- private long usedMemory = 0;
-
- public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
- this.conf = conf;
-
- this.fileNameAllocator = new TezTaskOutputFiles(conf,
- inputContext.getUniqueIdentifier());
- this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
- // Setup configuration
- final float maxInMemCopyUse = conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
- if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
- throw new IllegalArgumentException("Invalid value for "
- + TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
- + maxInMemCopyUse);
- }
-
- // Allow unit tests to fix Runtime memory
- this.memoryLimit = (long) (conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
- Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
-
- final float singleShuffleMemoryLimitPercent = conf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
- if (singleShuffleMemoryLimitPercent <= 0.0f
- || singleShuffleMemoryLimitPercent > 1.0f) {
- throw new IllegalArgumentException("Invalid value for "
- + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
- + singleShuffleMemoryLimitPercent);
- }
-
- this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
- }
-
- @Override
- public synchronized FetchedInput allocate(long size,
- InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
- if (size > maxSingleShuffleLimit
- || this.usedMemory + size > this.memoryLimit) {
- return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
- localDirAllocator, fileNameAllocator);
- } else {
- this.usedMemory += size;
- return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
- }
- }
-
- @Override
- public void fetchComplete(FetchedInput fetchedInput) {
- switch (fetchedInput.getType()) {
- // Not tracking anything here.
- case DISK:
- case MEMORY:
- break;
- default:
- throw new TezUncheckedException("InputType: " + fetchedInput.getType()
- + " not expected for Broadcast fetch");
- }
- }
-
- @Override
- public void fetchFailed(FetchedInput fetchedInput) {
- cleanup(fetchedInput);
- }
-
- @Override
- public void freeResources(FetchedInput fetchedInput) {
- cleanup(fetchedInput);
- }
-
- private void cleanup(FetchedInput fetchedInput) {
- switch (fetchedInput.getType()) {
- case DISK:
- break;
- case MEMORY:
- unreserve(fetchedInput.getSize());
- break;
- default:
- throw new TezUncheckedException("InputType: " + fetchedInput.getType()
- + " not expected for Broadcast fetch");
- }
- }
-
- private synchronized void unreserve(long size) {
- this.usedMemory -= size;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
deleted file mode 100644
index 2c53e75..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.shuffle.impl.InMemoryReader;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.shuffle.common.FetchedInput;
-import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
-import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
-
-public class BroadcastKVReader<K, V> implements KVReader {
-
- private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
-
- private final BroadcastShuffleManager shuffleManager;
- private final Configuration conf;
- private final CompressionCodec codec;
-
- private final Class<K> keyClass;
- private final Class<V> valClass;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valDeserializer;
- private final DataInputBuffer keyIn;
- private final DataInputBuffer valIn;
-
- private final SimpleValueIterator valueIterator;
- private final SimpleIterable valueIterable;
-
- private K key;
- private V value;
-
- private FetchedInput currentFetchedInput;
- private IFile.Reader currentReader;
-
-
- public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
- Configuration conf) {
- this.shuffleManager = shuffleManager;
- this.conf = conf;
-
- if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
-
- this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
- this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-
- this.keyIn = new DataInputBuffer();
- this.valIn = new DataInputBuffer();
-
- SerializationFactory serializationFactory = new SerializationFactory(conf);
-
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.valDeserializer = serializationFactory.getDeserializer(valClass);
-
- this.valueIterator = new SimpleValueIterator();
- this.valueIterable = new SimpleIterable(this.valueIterator);
- }
-
- // TODO NEWTEZ Maybe add an interface to check whether next will block.
-
- /**
- * Moves to the next key/values(s) pair
- *
- * @return true if another key/value(s) pair exists, false if there are no
- * more.
- * @throws IOException
- * if an error occurs
- */
- @Override
- public boolean next() throws IOException {
- if (readNextFromCurrentReader()) {
- return true;
- } else {
- boolean nextInputExists = moveToNextInput();
- while (nextInputExists) {
- if(readNextFromCurrentReader()) {
- return true;
- }
- nextInputExists = moveToNextInput();
- }
- return false;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public KVRecord getCurrentKV() throws IOException {
- this.valueIterator.setValue(value);
- return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
- }
-
- /**
- * Tries reading the next key and value from the current reader.
- * @return true if the current reader has more records
- * @throws IOException
- */
- private boolean readNextFromCurrentReader() throws IOException {
- // Initial reader.
- if (this.currentReader == null) {
- return false;
- } else {
- boolean hasMore = this.currentReader.nextRawKey(keyIn);
- if (hasMore) {
- this.currentReader.nextRawValue(valIn);
- this.key = keyDeserializer.deserialize(this.key);
- this.value = valDeserializer.deserialize(this.value);
- return true;
- }
- return false;
- }
- }
-
- /**
- * Moves to the next available input. This method may block if the input is not ready yet.
- * Also takes care of closing the previous input.
- *
- * @return true if the next input exists, false otherwise
- * @throws IOException
- * @throws InterruptedException
- */
- private boolean moveToNextInput() throws IOException {
- if (currentReader != null) { // Close the current reader.
- currentReader.close();
- currentFetchedInput.free();
- }
- try {
- currentFetchedInput = shuffleManager.getNextInput();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for next available input", e);
- throw new IOException(e);
- }
- if (currentFetchedInput == null) {
- return false; // No more inputs
- } else {
- currentReader = openIFileReader(currentFetchedInput);
- return true;
- }
- }
-
- public IFile.Reader openIFileReader(FetchedInput fetchedInput)
- throws IOException {
- if (fetchedInput.getType() == Type.MEMORY) {
- MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
-
- return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
- mfi.getBytes(), 0, (int) mfi.getSize());
- } else {
- return new IFile.Reader(conf, fetchedInput.getInputStream(),
- fetchedInput.getSize(), codec, null);
- }
- }
-
-
-
- // TODO NEWTEZ Move this into a common class. Also used in MRInput
- private class SimpleValueIterator implements Iterator<V> {
-
- private V value;
-
- public void setValue(V value) {
- this.value = value;
- }
-
- public boolean hasNext() {
- return value != null;
- }
-
- public V next() {
- V value = this.value;
- this.value = null;
- return value;
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private class SimpleIterable implements Iterable<V> {
- private final Iterator<V> iterator;
- public SimpleIterable(Iterator<V> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<V> iterator() {
- return iterator;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
deleted file mode 100644
index e89e892..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.api.events.InputFailedEvent;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleInputEventHandler;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class BroadcastShuffleInputEventHandler {
-
- private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
-
- private final BroadcastShuffleManager shuffleManager;
-
- public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager) {
- this.shuffleManager = shuffleManager;
- }
-
- public void handleEvents(List<Event> events) {
- for (Event event : events) {
- handleEvent(event);
- }
- }
-
- private void handleEvent(Event event) {
- if (event instanceof DataMovementEvent) {
- processDataMovementEvent((DataMovementEvent)event);
- } else if (event instanceof InputFailedEvent) {
- processInputFailedEvent((InputFailedEvent)event);
- } else {
- throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
- }
- }
-
-
- private void processDataMovementEvent(DataMovementEvent dme) {
- Preconditions.checkArgument(dme.getSourceIndex() == 0,
- "Unexpected srcIndex: " + dme.getSourceIndex()
- + " on DataMovementEvent. Can only be 0");
- DataMovementEventPayloadProto shufflePayload;
- try {
- shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
- } catch (InvalidProtocolBufferException e) {
- throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
- }
- if (shufflePayload.getOutputGenerated()) {
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
- shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
- } else {
- shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
- }
- }
-
- private void processInputFailedEvent(InputFailedEvent ife) {
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
- shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
deleted file mode 100644
index 7b205fa..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.InputReadErrorEvent;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.InputIdentifier;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.engine.shuffle.common.FetchResult;
-import org.apache.tez.engine.shuffle.common.FetchedInput;
-import org.apache.tez.engine.shuffle.common.Fetcher;
-import org.apache.tez.engine.shuffle.common.FetcherCallback;
-import org.apache.tez.engine.shuffle.common.InputHost;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-import org.apache.tez.engine.shuffle.common.Fetcher.FetcherBuilder;
-import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class BroadcastShuffleManager implements FetcherCallback {
-
- private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
-
- private TezInputContext inputContext;
- private int numInputs;
- private Configuration conf;
-
- private final BroadcastShuffleInputEventHandler inputEventHandler;
- private final FetchedInputAllocator inputManager;
-
- private final ExecutorService fetcherRawExecutor;
- private final ListeningExecutorService fetcherExecutor;
-
- private final BlockingQueue<FetchedInput> completedInputs;
- private final Set<InputIdentifier> completedInputSet;
- private final Set<InputIdentifier> pendingInputs;
- private final ConcurrentMap<String, InputHost> knownSrcHosts;
- private final Set<InputHost> pendingHosts;
- private final Set<InputAttemptIdentifier> obsoletedInputs;
-
- private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
-
- private final long startTime;
- private long lastProgressTime;
-
- private FutureTask<Void> runShuffleFuture;
-
- // Required to be held when manipulating pendingHosts
- private ReentrantLock lock = new ReentrantLock();
- private Condition wakeLoop = lock.newCondition();
-
- private final int numFetchers;
- private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
-
- // Parameters required by Fetchers
- private final SecretKey shuffleSecret;
- private final int connectionTimeout;
- private final int readTimeout;
- private final CompressionCodec codec;
- private final Decompressor decompressor;
-
- private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-
- private volatile Throwable shuffleError;
-
- // TODO NEWTEZ Add counters.
-
- public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
- this.inputContext = inputContext;
- this.conf = conf;
- this.numInputs = numInputs;
-
- this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
- this.inputManager = new BroadcastInputManager(inputContext, conf);
-
- pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
- completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
- completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
- knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
- pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
- obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-
- int maxConfiguredFetchers =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-
- this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
-
- this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
- .build());
- this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-
- this.startTime = System.currentTimeMillis();
- this.lastProgressTime = startTime;
-
- this.shuffleSecret = ShuffleUtils
- .getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
-
- this.connectionTimeout = conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
- this.readTimeout = conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
-
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- codec = null;
- decompressor = null;
- }
- }
-
- public void run() {
- RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
- runShuffleFuture = new FutureTask<Void>(callable);
- new Thread(runShuffleFuture, "ShuffleRunner");
- }
-
- private class RunBroadcastShuffleCallable implements Callable<Void> {
-
- @Override
- public Void call() throws Exception {
- while (numCompletedInputs.get() < numInputs) {
- if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
- synchronized(lock) {
- wakeLoop.await();
- }
- if (shuffleError != null) {
- // InputContext has already been informed of a fatal error.
- // Initiate shutdown.
- break;
- }
-
- if (numCompletedInputs.get() < numInputs) {
- synchronized (lock) {
- int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
- int count = 0;
- for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
- InputHost inputHost = inputHostIter.next();
- inputHostIter.remove();
- if (inputHost.getNumPendingInputs() > 0) {
- Fetcher fetcher = constructFetcherForHost(inputHost);
- numRunningFetchers.incrementAndGet();
- ListenableFuture<FetchResult> future = fetcherExecutor
- .submit(fetcher);
- Futures.addCallback(future, fetchFutureCallback);
- if (++count >= numFetchersToRun) {
- break;
- }
- }
- }
- }
- }
- }
- }
- // TODO NEWTEZ Maybe clean up inputs.
- if (!fetcherExecutor.isShutdown()) {
- fetcherExecutor.shutdownNow();
- }
- return null;
- }
- }
-
- private Fetcher constructFetcherForHost(InputHost inputHost) {
- FetcherBuilder fetcherBuilder = new FetcherBuilder(
- BroadcastShuffleManager.this, inputManager,
- inputContext.getApplicationId(), shuffleSecret, conf);
- fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
- fetcherBuilder.setCompressionParameters(codec, decompressor);
-
- // Remove obsolete inputs from the list being given to the fetcher. Also
- // remove from the obsolete list.
- List<InputAttemptIdentifier> pendingInputsForHost = inputHost
- .clearAndGetPendingInputs();
- for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
- .iterator(); inputIter.hasNext();) {
- InputAttemptIdentifier input = inputIter.next();
- // Avoid adding attempts which have already completed.
- if (completedInputSet.contains(input.getInputIdentifier())) {
- inputIter.remove();
- }
- // Avoid adding attempts which have been marked as OBSOLETE
- if (obsoletedInputs.contains(input)) {
- inputIter.remove();
- obsoletedInputs.remove(input);
- }
- }
- // TODO NEWTEZ Maybe limit the number of inputs being given to a single
- // fetcher, especially in the case where #hosts < #fetchers
- fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
- inputHost.clearAndGetPendingInputs());
- return fetcherBuilder.build();
- }
-
- /////////////////// Methods for InputEventHandler
-
- public void addKnownInput(String hostName, int port,
- InputAttemptIdentifier srcAttemptIdentifier, int partition) {
- InputHost host = knownSrcHosts.get(hostName);
- if (host == null) {
- host = new InputHost(hostName, port, inputContext.getApplicationId());
- InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
- if (old != null) {
- host = old;
- }
- }
- host.addKnownInput(srcAttemptIdentifier);
- synchronized(lock) {
- pendingHosts.add(host);
- wakeLoop.signal();
- }
- }
-
- public void addCompletedInputWithNoData(
- InputAttemptIdentifier srcAttemptIdentifier) {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
- LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
- if (pendingInputs.remove(inputIdentifier)) {
- completedInputSet.add(inputIdentifier);
- completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
- numCompletedInputs.incrementAndGet();
- }
-
- // Awake the loop to check for termination.
- synchronized (lock) {
- wakeLoop.signal();
- }
- }
-
- public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
- obsoletedInputs.add(srcAttemptIdentifier);
- // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
- }
-
-
- public void handleEvents(List<Event> events) {
- inputEventHandler.handleEvents(events);
- }
-
- /////////////////// End of Methods for InputEventHandler
- /////////////////// Methods from FetcherCallbackHandler
-
- @Override
- public void fetchSucceeded(String host,
- InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
- long copyDuration) throws IOException {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
- }
-
- // Count irrespective of whether this is a copy of an already fetched input
- synchronized(lock) {
- lastProgressTime = System.currentTimeMillis();
- }
-
- boolean committed = false;
- if (!completedInputSet.contains(inputIdentifier)) {
- synchronized (completedInputSet) {
- if (!completedInputSet.contains(inputIdentifier)) {
- fetchedInput.commit();
- committed = true;
- pendingInputs.remove(inputIdentifier);
- completedInputSet.add(inputIdentifier);
- completedInputs.add(fetchedInput);
- numCompletedInputs.incrementAndGet();
- }
- }
- }
- if (!committed) {
- fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
- } else {
- synchronized(lock) {
- // Signal the wakeLoop to check for termination.
- wakeLoop.signal();
- }
- }
- // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
- }
-
- @Override
- public void fetchFailed(String host,
- InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
- // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
- // For now, reporting immediately.
- InputReadErrorEvent readError = new InputReadErrorEvent(
- "Fetch failure while fetching from "
- + TezEngineUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(),
- srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
- srcAttemptIdentifier.getAttemptNumber()),
- srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
- srcAttemptIdentifier.getAttemptNumber());
-
- List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
- failedEvents.add(readError);
- inputContext.sendEvents(failedEvents);
- }
- /////////////////// End of Methods from FetcherCallbackHandler
-
- public void shutdown() throws InterruptedException {
- if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
- this.fetcherExecutor.shutdown();
- this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
- if (!this.fetcherExecutor.isShutdown()) {
- this.fetcherExecutor.shutdownNow();
- }
- }
- }
-
- /////////////////// Methods for walking the available inputs
-
- /**
- * @return true if there is another input ready for consumption.
- */
- public boolean newInputAvailable() {
- FetchedInput head = completedInputs.peek();
- if (head == null || head instanceof NullFetchedInput) {
- return false;
- } else {
- return true;
- }
- }
-
- /**
- * @return true if all of the required inputs have been fetched.
- */
- public boolean allInputsFetched() {
- return numCompletedInputs.get() == numInputs;
- }
-
- /**
- * @return the next available input, or null if there are no available inputs.
- * This method will block if there are currently no available inputs,
- * but more may become available.
- */
- public FetchedInput getNextInput() throws InterruptedException {
- FetchedInput input = null;
- do {
- input = completedInputs.peek();
- if (input == null) {
- if (allInputsFetched()) {
- break;
- } else {
- input = completedInputs.take(); // block
- }
- } else {
- input = completedInputs.poll();
- }
- } while (input instanceof NullFetchedInput);
- return input;
- }
-
- /////////////////// End of methods for walking the available inputs
-
-
- /**
- * Fake input that is added to the completed input list in case an input does not have any data.
- *
- */
- private class NullFetchedInput extends FetchedInput {
-
- public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
- super(Type.MEMORY, -1, inputAttemptIdentifier, null);
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public void commit() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public void abort() throws IOException {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
-
- @Override
- public void free() {
- throw new UnsupportedOperationException("Not supported for NullFetchedInput");
- }
- }
-
-
- private class FetchFutureCallback implements FutureCallback<FetchResult> {
-
- private void doBookKeepingForFetcherComplete() {
- numRunningFetchers.decrementAndGet();
- synchronized(lock) {
- wakeLoop.signal();
- }
- }
-
- @Override
- public void onSuccess(FetchResult result) {
- Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
- if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
- InputHost inputHost = knownSrcHosts.get(result.getHost());
- assert inputHost != null;
- for (InputAttemptIdentifier input : pendingInputs) {
- inputHost.addKnownInput(input);
- }
- pendingHosts.add(inputHost);
- }
- doBookKeepingForFetcherComplete();
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Fetcher failed with error: " + t);
- shuffleError = t;
- inputContext.fatalError(t, "Fetched failed");
- doBookKeepingForFetcherComplete();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
deleted file mode 100644
index 474d1cd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.broadcast.output;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
-import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-
-public class FileBasedKVWriter implements KVWriter {
-
- public static final int INDEX_RECORD_LENGTH = 24;
-
- private final Configuration conf;
- private int numRecords = 0;
-
- @SuppressWarnings("rawtypes")
- private Class keyClass;
- @SuppressWarnings("rawtypes")
- private Class valClass;
- private CompressionCodec codec;
- private FileSystem rfs;
- private IFile.Writer writer;
-
- private TezTaskOutput ouputFileManager;
-
- // TODO NEWTEZ Define Counters
- // Number of records
- // Time waiting for a write to complete, if that's possible.
- // Size of key-value pairs written.
-
- public FileBasedKVWriter(TezOutputContext outputContext) throws IOException {
- this.conf = TezUtils.createConfFromUserPayload(outputContext
- .getUserPayload());
- this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
- outputContext.getWorkDirs());
-
- this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
-
- // Setup serialization
- keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
- valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
-
- // Setup compression
- if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, this.conf);
- } else {
- codec = null;
- }
-
- this.ouputFileManager = TezEngineUtils.instantiateTaskOutputManager(conf,
- outputContext);
-
- initWriter();
- }
-
- /**
- * @return true if any output was generated. false otherwise
- * @throws IOException
- */
- public boolean close() throws IOException {
- this.writer.close();
- TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
- writer.getCompressedLength());
- TezSpillRecord sr = new TezSpillRecord(1);
- sr.putIndex(rec, 0);
-
- Path indexFile = ouputFileManager
- .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
- sr.writeToFile(indexFile, conf);
- return numRecords > 0;
- }
-
- @Override
- public void write(Object key, Object value) throws IOException {
- this.writer.append(key, value);
- numRecords++;
- }
-
- public void initWriter() throws IOException {
- Path outputFile = ouputFileManager.getOutputFileForWrite();
-
- // TODO NEWTEZ Consider making the buffer size configurable. Also consider
- // setting up an in-memory buffer which is occasionally flushed to disk so
- // that the output does not block.
-
- // TODO NEWTEZ maybe use appropriate counter
- this.writer = new IFile.Writer(conf, rfs, outputFile, keyClass, valClass,
- codec, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
deleted file mode 100644
index f73adfd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class ConfigUtils {
-
- public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
- Configuration conf, Class<DefaultCodec> defaultValue) {
- Class<? extends CompressionCodec> codecClass = defaultValue;
- String name = conf
- .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
- if (name != null) {
- try {
- codecClass = conf.getClassByName(name).asSubclass(
- CompressionCodec.class);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Compression codec " + name
- + " was not found.", e);
- }
- }
- return codecClass;
- }
-
- public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
- Configuration conf, Class<DefaultCodec> defaultValue) {
- Class<? extends CompressionCodec> codecClass = defaultValue;
- String name = conf
- .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC);
- if (name != null) {
- try {
- codecClass = conf.getClassByName(name).asSubclass(
- CompressionCodec.class);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Compression codec " + name
- + " was not found.", e);
- }
- }
- return codecClass;
- }
-
-
- // TODO Move defaults over to a constants file.
-
- public static boolean shouldCompressIntermediateOutput(Configuration conf) {
- return conf.getBoolean(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
- }
-
- public static boolean isIntermediateInputCompressed(Configuration conf) {
- return conf.getBoolean(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED, false);
- }
-
- public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
- Class<V> retv = (Class<V>) conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
- Object.class);
- return retv;
- }
-
- public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
- Class<V> retv = (Class<V>) conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS, null,
- Object.class);
- return retv;
- }
-
- public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
- Class<K> retv = (Class<K>) conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
- Object.class);
- return retv;
- }
-
- public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
- Class<K> retv = (Class<K>) conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS, null,
- Object.class);
- return retv;
- }
-
- public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
- Class<? extends RawComparator> theClass = conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
- RawComparator.class);
- if (theClass != null)
- return ReflectionUtils.newInstance(theClass, conf);
- return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
- WritableComparable.class));
- }
-
- public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
- Class<? extends RawComparator> theClass = conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
- RawComparator.class);
- if (theClass != null)
- return ReflectionUtils.newInstance(theClass, conf);
- return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
- WritableComparable.class));
- }
-
-
-
- // TODO Fix name
- public static <V> RawComparator<V> getInputKeySecondaryGroupingComparator(
- Configuration conf) {
- Class<? extends RawComparator> theClass = conf
- .getClass(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
- null, RawComparator.class);
- if (theClass == null) {
- return getIntermediateInputKeyComparator(conf);
- }
-
- return ReflectionUtils.newInstance(theClass, conf);
- }
-
- public static boolean useNewApi(Configuration conf) {
- return conf.getBoolean("mapred.mapper.new-api", false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
deleted file mode 100644
index 076807e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-
-/**
- * Container for a task number and an attempt number for the task.
- */
-@Private
-public class InputAttemptIdentifier {
-
- private final InputIdentifier inputIdentifier;
- private final int attemptNumber;
- private String pathComponent;
-
- public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
- this(new InputIdentifier(taskIndex), attemptNumber, null);
- }
-
- public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
- this.inputIdentifier = inputIdentifier;
- this.attemptNumber = attemptNumber;
- this.pathComponent = pathComponent;
- }
-
- public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
- this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
- }
-
- public InputIdentifier getInputIdentifier() {
- return this.inputIdentifier;
- }
-
- public int getAttemptNumber() {
- return attemptNumber;
- }
-
- public String getPathComponent() {
- return pathComponent;
- }
-
- // PathComponent does not need to be part of the hashCode and equals computation.
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + attemptNumber;
- result = prime * result
- + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
- if (attemptNumber != other.attemptNumber)
- return false;
- if (inputIdentifier == null) {
- if (other.inputIdentifier != null)
- return false;
- } else if (!inputIdentifier.equals(other.inputIdentifier))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
- + ", attemptNumber=" + attemptNumber + ", pathComponent="
- + pathComponent + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
deleted file mode 100644
index b694530..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common;
-
-public class InputIdentifier {
-
- private final int srcTaskIndex;
-
- public InputIdentifier(int srcTaskIndex) {
- this.srcTaskIndex = srcTaskIndex;
- }
-
- public int getSrcTaskIndex() {
- return this.srcTaskIndex;
- }
-
- @Override
- public int hashCode() {
- return srcTaskIndex;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- InputIdentifier other = (InputIdentifier) obj;
- if (srcTaskIndex != other.srcTaskIndex)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
deleted file mode 100644
index cc29e94..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.TezTaskContext;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-
-public class TezEngineUtils {
-
- private static final Log LOG = LogFactory
- .getLog(TezEngineUtils.class);
-
- public static String getTaskIdentifier(String vertexName, int taskIndex) {
- return String.format("%s_%06d", vertexName, taskIndex);
- }
-
- public static String getTaskAttemptIdentifier(int taskIndex,
- int taskAttemptNumber) {
- return String.format("%d_%d", taskIndex, taskAttemptNumber);
- }
-
- // TODO Maybe include a dag name in this.
- public static String getTaskAttemptIdentifier(String vertexName,
- int taskIndex, int taskAttemptNumber) {
- return String.format("%s_%06d_%02d", vertexName, taskIndex,
- taskAttemptNumber);
- }
-
- @SuppressWarnings("unchecked")
- public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException {
- Class<? extends Combiner> clazz;
- String className = conf.get(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS);
- if (className == null) {
- LOG.info("No combiner specified via " + TezJobConfig.TEZ_ENGINE_COMBINER_CLASS + ". Combiner will not be used");
- return null;
- }
- LOG.info("Using Combiner class: " + className);
- try {
- clazz = (Class<? extends Combiner>) conf.getClassByName(className);
- } catch (ClassNotFoundException e) {
- throw new IOException("Unable to load combiner class: " + className);
- }
-
- Combiner combiner = null;
-
- Constructor<? extends Combiner> ctor;
- try {
- ctor = clazz.getConstructor(TezTaskContext.class);
- combiner = ctor.newInstance(taskContext);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- }
- return combiner;
- }
-
- @SuppressWarnings("unchecked")
- public static Partitioner instantiatePartitioner(Configuration conf)
- throws IOException {
- Class<? extends Partitioner> clazz;
- try {
- clazz = (Class<? extends Partitioner>) conf
- .getClassByName(conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS));
- } catch (ClassNotFoundException e) {
- throw new IOException("Unable to find Partitioner class in config", e);
- }
-
- LOG.info("Using partitioner class: " + clazz.getName());
-
- Partitioner partitioner = null;
-
- try {
- Constructor<? extends Partitioner> ctorWithConf = clazz
- .getConstructor(Configuration.class);
- partitioner = ctorWithConf.newInstance(conf);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- try {
- // Try a 0 argument constructor.
- partitioner = clazz.newInstance();
- } catch (InstantiationException e1) {
- throw new IOException(e1);
- } catch (IllegalAccessException e1) {
- throw new IOException(e1);
- }
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- }
- return partitioner;
- }
-
- public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
- Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
- TezTaskOutputFiles.class);
- try {
- Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
- ctor.setAccessible(true);
- TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
- return instance;
- } catch (Exception e) {
- throw new TezUncheckedException(
- "Unable to instantiate configured TezOutputFileManager: "
- + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
- TezTaskOutputFiles.class.getName()), e);
- }
- }
-}