You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:17 UTC
[08/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
new file mode 100644
index 0000000..10699ac
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class TezHeartbeatResponse implements Writable {
+
+ private long lastRequestId;
+ private boolean shouldDie = false;
+ private List<TezEvent> events;
+
+ public TezHeartbeatResponse() {
+ }
+
+ public TezHeartbeatResponse(List<TezEvent> events) {
+ this.events = Collections.unmodifiableList(events);
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public boolean shouldDie() {
+ return shouldDie;
+ }
+
+ public long getLastRequestId() {
+ return lastRequestId;
+ }
+
+ public void setEvents(List<TezEvent> events) {
+ this.events = Collections.unmodifiableList(events);
+ }
+
+ public void setLastRequestId(long lastRequestId ) {
+ this.lastRequestId = lastRequestId;
+ }
+
+ public void setShouldDie() {
+ this.shouldDie = true;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(lastRequestId);
+ out.writeBoolean(shouldDie);
+ if(events != null) {
+ out.writeBoolean(true);
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ lastRequestId = in.readLong();
+ shouldDie = in.readBoolean();
+ if(in.readBoolean()) {
+ int eventCount = in.readInt();
+ events = new ArrayList<TezEvent>(eventCount);
+ for (int i = 0; i < eventCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ "
+ + " lastRequestId=" + lastRequestId
+ + ", shouldDie=" + shouldDie
+ + ", eventCount=" + (events != null ? events.size() : 0)
+ + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
new file mode 100644
index 0000000..9169895
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+public class TezInputContextImpl extends TezTaskContextImpl
+ implements TezInputContext {
+
+ private final byte[] userPayload;
+ private final String sourceVertexName;
+ private final EventMetaData sourceInfo;
+
+ @Private
+ public TezInputContextImpl(Configuration conf, int appAttemptNumber,
+ TezUmbilical tezUmbilical, String taskVertexName,
+ String sourceVertexName, TezTaskAttemptID taskAttemptID,
+ TezCounters counters, byte[] userPayload,
+ RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
+ super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ this.userPayload = userPayload;
+ this.sourceVertexName = sourceVertexName;
+ this.sourceInfo = new EventMetaData(
+ EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
+ taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
+ }
+
+ @Override
+ public void sendEvents(List<Event> events) {
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ @Override
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ @Override
+ public String getSourceVertexName() {
+ return sourceVertexName;
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
new file mode 100644
index 0000000..fd4c3a3
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+public class TezOutputContextImpl extends TezTaskContextImpl
+ implements TezOutputContext {
+
+ private final byte[] userPayload;
+ private final String destinationVertexName;
+ private final EventMetaData sourceInfo;
+
+ @Private
+ public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
+ TezUmbilical tezUmbilical, String taskVertexName,
+ String destinationVertexName,
+ TezTaskAttemptID taskAttemptID, TezCounters counters,
+ byte[] userPayload, RuntimeTask runtimeTask,
+ Map<String, ByteBuffer> serviceConsumerMetadata) {
+ super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ this.userPayload = userPayload;
+ this.destinationVertexName = destinationVertexName;
+ this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+ taskVertexName, destinationVertexName, taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
+ }
+
+ @Override
+ public void sendEvents(List<Event> events) {
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ @Override
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ @Override
+ public String getDestinationVertexName() {
+ return destinationVertexName;
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
new file mode 100644
index 0000000..e73baf4
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+public class TezProcessorContextImpl extends TezTaskContextImpl
+ implements TezProcessorContext {
+
+ private final byte[] userPayload;
+ private final EventMetaData sourceInfo;
+
+ public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
+ TezUmbilical tezUmbilical, String vertexName,
+ TezTaskAttemptID taskAttemptID, TezCounters counters,
+ byte[] userPayload, RuntimeTask runtimeTask,
+ Map<String, ByteBuffer> serviceConsumerMetadata) {
+ super(conf, appAttemptNumber, vertexName, taskAttemptID,
+ counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+ this.userPayload = userPayload;
+ this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
+ taskVertexName, "", taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getTaskAttemptNumber());
+ }
+
+ @Override
+ public void sendEvents(List<Event> events) {
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ @Override
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ @Override
+ public void setProgress(float progress) {
+ runtimeTask.setProgress(progress);
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+
+ @Override
+ public boolean canCommit() throws IOException {
+ return tezUmbilical.canCommit(this.taskAttemptID);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
new file mode 100644
index 0000000..ee9e96d
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.TezTaskContext;
+
+public abstract class TezTaskContextImpl implements TezTaskContext {
+
+ private final Configuration conf;
+ protected final String taskVertexName;
+ protected final TezTaskAttemptID taskAttemptID;
+ private final TezCounters counters;
+ private String[] workDirs;
+ protected String uniqueIdentifier;
+ protected final RuntimeTask runtimeTask;
+ protected final TezUmbilical tezUmbilical;
+ private final Map<String, ByteBuffer> serviceConsumerMetadata;
+ private final int appAttemptNumber;
+
+ @Private
+ public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
+ String taskVertexName, TezTaskAttemptID taskAttemptID,
+ TezCounters counters, RuntimeTask runtimeTask,
+ TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
+ this.conf = conf;
+ this.taskVertexName = taskVertexName;
+ this.taskAttemptID = taskAttemptID;
+ this.counters = counters;
+ // TODO Maybe change this to be task id specific at some point. For now
+ // Shuffle code relies on this being a path specified by YARN
+ this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+ this.runtimeTask = runtimeTask;
+ this.tezUmbilical = tezUmbilical;
+ this.serviceConsumerMetadata = serviceConsumerMetadata;
+ // TODO NEWTEZ at some point dag attempt should not map to app attempt
+ this.appAttemptNumber = appAttemptNumber;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return taskAttemptID.getTaskID().getVertexID().getDAGId()
+ .getApplicationId();
+ }
+
+ @Override
+ public int getTaskIndex() {
+ return taskAttemptID.getTaskID().getId();
+ }
+
+ @Override
+ public int getDAGAttemptNumber() {
+ return appAttemptNumber;
+ }
+
+ @Override
+ public int getTaskAttemptNumber() {
+ return taskAttemptID.getId();
+ }
+
+ @Override
+ public String getDAGName() {
+ // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+ // the unique identifier.
+ return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+ }
+
+ @Override
+ public String getTaskVertexName() {
+ return taskVertexName;
+ }
+
+
+ @Override
+ public TezCounters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public String[] getWorkDirs() {
+ return Arrays.copyOf(workDirs, workDirs.length);
+ }
+
+ @Override
+ public String getUniqueIdentifier() {
+ return uniqueIdentifier;
+ }
+
+ @Override
+ public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+ return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
+ .asReadOnlyBuffer().rewind();
+ }
+
+ @Override
+ public ByteBuffer getServiceProviderMetaData(String serviceName) {
+ return AuxiliaryServiceHelper.getServiceDataFromEnv(
+ serviceName, System.getenv());
+ }
+
+ protected void signalFatalError(Throwable t, String message,
+ EventMetaData sourceInfo) {
+ runtimeTask.setFatalError(t, message);
+ String diagnostics;
+ if (t != null && message != null) {
+ diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
+ + ", errorMessage=" + message;
+ } else if (t == null && message == null) {
+ diagnostics = "Unknown error";
+ } else {
+ diagnostics = t != null ?
+ "exceptionThrown=" + StringUtils.stringifyException(t)
+ : " errorMessage=" + message;
+ }
+ tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
new file mode 100644
index 0000000..addccda
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public interface TezUmbilical {
+
+ public void addEvents(Collection<TezEvent> events);
+
+ public void signalFatalError(TezTaskAttemptID taskAttemptID,
+ String diagnostics,
+ EventMetaData sourceInfo);
+
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java
new file mode 100644
index 0000000..a47526b
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.objectregistry;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+
+import com.google.inject.Singleton;
+
+@Singleton
+public class ObjectRegistryImpl implements ObjectRegistry {
+
+ private Map<String, Map.Entry<Object, ObjectLifeCycle>> objectCache =
+ new HashMap<String, Map.Entry<Object, ObjectLifeCycle>>();
+
+ @Override
+ public synchronized Object add(ObjectLifeCycle lifeCycle,
+ String key, Object value) {
+ Map.Entry<Object, ObjectLifeCycle> oldEntry =
+ objectCache.put(key,
+ new AbstractMap.SimpleImmutableEntry<Object, ObjectLifeCycle>(
+ value, lifeCycle));
+ return oldEntry != null ? oldEntry.getKey() : null;
+ }
+
+ @Override
+ public synchronized Object get(String key) {
+ Map.Entry<Object, ObjectLifeCycle> entry =
+ objectCache.get(key);
+ return entry != null ? entry.getKey() : null;
+ }
+
+ @Override
+ public synchronized boolean delete(String key) {
+ return (null != objectCache.remove(key));
+ }
+
+ public synchronized void clearCache(ObjectLifeCycle lifeCycle) {
+ for (Entry<String, Entry<Object, ObjectLifeCycle>> entry :
+ objectCache.entrySet()) {
+ if (entry.getValue().getValue().equals(lifeCycle)) {
+ objectCache.remove(entry.getKey());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
new file mode 100644
index 0000000..97ccf7c
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.objectregistry;
+
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+
+public class ObjectRegistryModule extends AbstractModule {
+
+ private final ObjectRegistry objectRegistry;
+
+ public ObjectRegistryModule(ObjectRegistry objectRegistry) {
+ this.objectRegistry = objectRegistry;
+ }
+
+ @VisibleForTesting
+ public ObjectRegistryModule() {
+ objectRegistry = new ObjectRegistryImpl();
+ }
+
+ @Override
+ protected void configure() {
+ bind(ObjectRegistry.class).toInstance(this.objectRegistry);
+ requestStaticInjection(ObjectRegistryFactory.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/proto/Events.proto b/tez-runtime-internals/src/main/proto/Events.proto
new file mode 100644
index 0000000..558a2b3
--- /dev/null
+++ b/tez-runtime-internals/src/main/proto/Events.proto
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.runtime.internals.api.events";
+option java_outer_classname = "SystemEventProtos";
+option java_generate_equals_and_hash = true;
+
+message TaskAttemptFailedEventProto {
+ optional string diagnostics = 1;
+}
+
+message TaskAttemptCompletedEventProto {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
new file mode 100644
index 0000000..35192e7
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.objectregistry;
+
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestObjectRegistry {
+
+ @SuppressWarnings("unused")
+ @Before
+ public void setup() {
+ Injector injector = Guice.createInjector(new ObjectRegistryModule());
+ }
+
+ @Test
+ public void testBasicCRUD() {
+ ObjectRegistry objectRegistry =
+ ObjectRegistryFactory.getObjectRegistry();
+ Assert.assertNotNull(objectRegistry);
+
+ Assert.assertNull(objectRegistry.get("foo"));
+ Assert.assertFalse(objectRegistry.delete("foo"));
+ Integer one = new Integer(1);
+ Integer two_1 = new Integer(2);
+ Integer two_2 = new Integer(3);
+ Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "one", one));
+ Assert.assertEquals(one, objectRegistry.get("one"));
+ Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "two", two_1));
+ Assert.assertNotNull(objectRegistry.add(ObjectLifeCycle.SESSION, "two", two_2));
+ Assert.assertNotEquals(two_1, objectRegistry.get("two"));
+ Assert.assertEquals(two_2, objectRegistry.get("two"));
+ Assert.assertTrue(objectRegistry.delete("one"));
+ Assert.assertFalse(objectRegistry.delete("one"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
new file mode 100644
index 0000000..dcdabe1
--- /dev/null
+++ b/tez-runtime-library/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-runtime-library</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>ShufflePayloads.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
new file mode 100644
index 0000000..16f7a8f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Unstable
+public class BufferUtils {
+ public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
+ byte[] b1 = buf1.getData();
+ byte[] b2 = buf2.getData();
+ int s1 = buf1.getPosition();
+ int s2 = buf2.getPosition();
+ int l1 = buf1.getLength();
+ int l2 = buf2.getLength();
+ return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+ }
+
+ public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
+ byte[] b1 = buf1.getData();
+ byte[] b2 = buf2.getData();
+ int s1 = 0;
+ int s2 = 0;
+ int l1 = buf1.getLength();
+ int l2 = buf2.getLength();
+ return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+ }
+
+ public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+ byte[] b1 = buf1.getData();
+ byte[] b2 = buf2.getData();
+ int s1 = buf1.getPosition();
+ int s2 = 0;
+ int l1 = buf1.getLength();
+ int l2 = buf2.getLength();
+ return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+ }
+
+ public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
+ return compare(buf2, buf1);
+ }
+
+ public static void copy(DataInputBuffer src, DataOutputBuffer dst)
+ throws IOException {
+ byte[] b1 = src.getData();
+ int s1 = src.getPosition();
+ int l1 = src.getLength();
+ dst.reset();
+ dst.write(b1, s1, l1 - s1);
+ }
+
+ public static void copy(DataOutputBuffer src, DataOutputBuffer dst)
+ throws IOException {
+ byte[] b1 = src.getData();
+ int s1 = 0;
+ int l1 = src.getLength();
+ dst.reset();
+ dst.write(b1, s1, l1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
new file mode 100644
index 0000000..a372e01
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+public interface HashComparator<KEY> {
+
+ int getHashCode(KEY key);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
new file mode 100644
index 0000000..9c6b380
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ *
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ * KVRecord kvRecord = getCurrentKV();
+ * Object key = kvRecord.getKey();
+ * Iterable values = kvRecord.getValues();
+ * </code>
+ *
+ */
+public interface KVReader extends Reader {
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no more.
+ * @throws IOException
+ * if an error occurs
+ */
+ public boolean next() throws IOException;
+
+ /**
+ * Return the current key/value(s) pair. Use moveToNext() to advance.
+ * @return
+ * @throws IOException
+ */
+ public KVRecord getCurrentKV() throws IOException;
+
+ // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
+
+ // TODO NEWTEZ KVRecord which does not need to return a list!
+ // TODO NEWTEZ Parameterize this
+ /**
+ * Represents a key and an associated set of values
+ *
+ */
+ public static class KVRecord {
+
+ private Object key;
+ private Iterable<Object> values;
+
+ public KVRecord(Object key, Iterable<Object> values) {
+ this.key = key;
+ this.values = values;
+ }
+
+ public Object getKey() {
+ return this.key;
+ }
+
+ public Iterable<Object> getValues() {
+ return this.values;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
new file mode 100644
index 0000000..ff952ed
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer}
+ */
+public interface KVWriter extends Writer {
+ /**
+ * Writes a key/value pair.
+ *
+ * @param key
+ * the key to write
+ * @param value
+ * the value to write
+ * @throws IOException
+ * if an error occurs
+ */
+ public void write(Object key, Object value) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java
new file mode 100644
index 0000000..680c9b8
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.api;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
+/**
+ * {@link Partitioner} is used by the TEZ framework to partition output
+ * key/value pairs.
+ *
+ * <b>Partitioner Initialization</b></p> The Partitioner class is picked up
+ * using the TEZ_RUNTIME_PARTITIONER_CLASS attribute in {@link TezJobConfig}
+ *
+ * TODO NEWTEZ Change construction to first check for a Constructor with a bytep[] payload
+ *
+ * Partitioners need to provide a single argument ({@link Configuration})
+ * constructor or a 0 argument constructor. If both exist, preference is given
+ * to the single argument constructor. This is primarily for MR support.
+ *
+ * If using the configuration constructor, TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS
+ * will be set in the configuration, to indicate the max number of expected
+ * partitions.
+ *
+ */
+public interface Partitioner {
+
+ /**
+ * Get partition for given key/value.
+ * @param key key
+ * @param value value
+ * @param numPartitions number of partitions
+ * @return
+ */
+ int getPartition(Object key, Object value, int numPartitions);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
new file mode 100644
index 0000000..cda52da
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+public class BroadcastInputManager implements FetchedInputAllocator,
+ FetchedInputCallback {
+
+ private final Configuration conf;
+
+ private final TezTaskOutputFiles fileNameAllocator;
+ private final LocalDirAllocator localDirAllocator;
+
+ // Configuration parameters
+ private final long memoryLimit;
+ private final long maxSingleShuffleLimit;
+
+ private long usedMemory = 0;
+
+ public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
+ this.conf = conf;
+
+ this.fileNameAllocator = new TezTaskOutputFiles(conf,
+ inputContext.getUniqueIdentifier());
+ this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+ // Setup configuration
+ final float maxInMemCopyUse = conf.getFloat(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new IllegalArgumentException("Invalid value for "
+ + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ + maxInMemCopyUse);
+ }
+
+ // Allow unit tests to fix Runtime memory
+ this.memoryLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+ Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+ final float singleShuffleMemoryLimitPercent = conf.getFloat(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ if (singleShuffleMemoryLimitPercent <= 0.0f
+ || singleShuffleMemoryLimitPercent > 1.0f) {
+ throw new IllegalArgumentException("Invalid value for "
+ + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ + singleShuffleMemoryLimitPercent);
+ }
+
+ this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+ }
+
+ @Override
+ public synchronized FetchedInput allocate(long size,
+ InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+ if (size > maxSingleShuffleLimit
+ || this.usedMemory + size > this.memoryLimit) {
+ return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
+ localDirAllocator, fileNameAllocator);
+ } else {
+ this.usedMemory += size;
+ return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
+ }
+ }
+
+ @Override
+ public void fetchComplete(FetchedInput fetchedInput) {
+ switch (fetchedInput.getType()) {
+ // Not tracking anything here.
+ case DISK:
+ case MEMORY:
+ break;
+ default:
+ throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ + " not expected for Broadcast fetch");
+ }
+ }
+
+ @Override
+ public void fetchFailed(FetchedInput fetchedInput) {
+ cleanup(fetchedInput);
+ }
+
+ @Override
+ public void freeResources(FetchedInput fetchedInput) {
+ cleanup(fetchedInput);
+ }
+
+ private void cleanup(FetchedInput fetchedInput) {
+ switch (fetchedInput.getType()) {
+ case DISK:
+ break;
+ case MEMORY:
+ unreserve(fetchedInput.getSize());
+ break;
+ default:
+ throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+ + " not expected for Broadcast fetch");
+ }
+ }
+
+ private synchronized void unreserve(long size) {
+ this.usedMemory -= size;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
new file mode 100644
index 0000000..16e9645
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+
+public class BroadcastKVReader<K, V> implements KVReader {
+
+ private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
+
+ private final BroadcastShuffleManager shuffleManager;
+ private final Configuration conf;
+ private final CompressionCodec codec;
+
+ private final Class<K> keyClass;
+ private final Class<V> valClass;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valDeserializer;
+ private final DataInputBuffer keyIn;
+ private final DataInputBuffer valIn;
+
+ private final SimpleValueIterator valueIterator;
+ private final SimpleIterable valueIterable;
+
+ private K key;
+ private V value;
+
+ private FetchedInput currentFetchedInput;
+ private IFile.Reader currentReader;
+
+
+ public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
+ Configuration conf) {
+ this.shuffleManager = shuffleManager;
+ this.conf = conf;
+
+ if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ codec = null;
+ }
+
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+
+ this.keyIn = new DataInputBuffer();
+ this.valIn = new DataInputBuffer();
+
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+
+ this.valueIterator = new SimpleValueIterator();
+ this.valueIterable = new SimpleIterable(this.valueIterator);
+ }
+
+ // TODO NEWTEZ Maybe add an interface to check whether next will block.
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no
+ * more.
+ * @throws IOException
+ * if an error occurs
+ */
+ @Override
+ public boolean next() throws IOException {
+ if (readNextFromCurrentReader()) {
+ return true;
+ } else {
+ boolean nextInputExists = moveToNextInput();
+ while (nextInputExists) {
+ if(readNextFromCurrentReader()) {
+ return true;
+ }
+ nextInputExists = moveToNextInput();
+ }
+ return false;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KVRecord getCurrentKV() throws IOException {
+ this.valueIterator.setValue(value);
+ return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
+ }
+
+ /**
+ * Tries reading the next key and value from the current reader.
+ * @return true if the current reader has more records
+ * @throws IOException
+ */
+ private boolean readNextFromCurrentReader() throws IOException {
+ // Initial reader.
+ if (this.currentReader == null) {
+ return false;
+ } else {
+ boolean hasMore = this.currentReader.nextRawKey(keyIn);
+ if (hasMore) {
+ this.currentReader.nextRawValue(valIn);
+ this.key = keyDeserializer.deserialize(this.key);
+ this.value = valDeserializer.deserialize(this.value);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Moves to the next available input. This method may block if the input is not ready yet.
+ * Also takes care of closing the previous input.
+ *
+ * @return true if the next input exists, false otherwise
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private boolean moveToNextInput() throws IOException {
+ if (currentReader != null) { // Close the current reader.
+ currentReader.close();
+ currentFetchedInput.free();
+ }
+ try {
+ currentFetchedInput = shuffleManager.getNextInput();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for next available input", e);
+ throw new IOException(e);
+ }
+ if (currentFetchedInput == null) {
+ return false; // No more inputs
+ } else {
+ currentReader = openIFileReader(currentFetchedInput);
+ return true;
+ }
+ }
+
+ public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+ throws IOException {
+ if (fetchedInput.getType() == Type.MEMORY) {
+ MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+ return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+ mfi.getBytes(), 0, (int) mfi.getSize());
+ } else {
+ return new IFile.Reader(conf, fetchedInput.getInputStream(),
+ fetchedInput.getSize(), codec, null);
+ }
+ }
+
+
+
+ // TODO NEWTEZ Move this into a common class. Also used in MRInput
+ private class SimpleValueIterator implements Iterator<V> {
+
+ private V value;
+
+ public void setValue(V value) {
+ this.value = value;
+ }
+
+ public boolean hasNext() {
+ return value != null;
+ }
+
+ public V next() {
+ V value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class SimpleIterable implements Iterable<V> {
+ private final Iterator<V> iterator;
+ public SimpleIterable(Iterator<V> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<V> iterator() {
+ return iterator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
new file mode 100644
index 0000000..c64379a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandler;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class BroadcastShuffleInputEventHandler {
+
+ private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+
+ private final BroadcastShuffleManager shuffleManager;
+
+ public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager) {
+ this.shuffleManager = shuffleManager;
+ }
+
+ public void handleEvents(List<Event> events) {
+ for (Event event : events) {
+ handleEvent(event);
+ }
+ }
+
+ private void handleEvent(Event event) {
+ if (event instanceof DataMovementEvent) {
+ processDataMovementEvent((DataMovementEvent)event);
+ } else if (event instanceof InputFailedEvent) {
+ processInputFailedEvent((InputFailedEvent)event);
+ } else {
+ throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+ }
+ }
+
+
+ private void processDataMovementEvent(DataMovementEvent dme) {
+ Preconditions.checkArgument(dme.getSourceIndex() == 0,
+ "Unexpected srcIndex: " + dme.getSourceIndex()
+ + " on DataMovementEvent. Can only be 0");
+ DataMovementEventPayloadProto shufflePayload;
+ try {
+ shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+ }
+ if (shufflePayload.getOutputGenerated()) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
+ shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
+ } else {
+ shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
+ }
+ }
+
+ private void processInputFailedEvent(InputFailedEvent ife) {
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+ shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
new file mode 100644
index 0000000..2a5c22f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.runtime.library.shuffle.common.FetchResult;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher;
+import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
+import org.apache.tez.runtime.library.shuffle.common.InputHost;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BroadcastShuffleManager implements FetcherCallback {
+
+ private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
+
+ private TezInputContext inputContext;
+ private int numInputs;
+ private Configuration conf;
+
+ private final BroadcastShuffleInputEventHandler inputEventHandler;
+ private final FetchedInputAllocator inputManager;
+
+ private final ExecutorService fetcherRawExecutor;
+ private final ListeningExecutorService fetcherExecutor;
+
+ private final BlockingQueue<FetchedInput> completedInputs;
+ private final Set<InputIdentifier> completedInputSet;
+ private final Set<InputIdentifier> pendingInputs;
+ private final ConcurrentMap<String, InputHost> knownSrcHosts;
+ private final Set<InputHost> pendingHosts;
+ private final Set<InputAttemptIdentifier> obsoletedInputs;
+
+ private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+
+ private final long startTime;
+ private long lastProgressTime;
+
+ private FutureTask<Void> runShuffleFuture;
+
+ // Required to be held when manipulating pendingHosts
+ private ReentrantLock lock = new ReentrantLock();
+ private Condition wakeLoop = lock.newCondition();
+
+ private final int numFetchers;
+ private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
+
+ // Parameters required by Fetchers
+ private final SecretKey shuffleSecret;
+ private final int connectionTimeout;
+ private final int readTimeout;
+ private final CompressionCodec codec;
+ private final Decompressor decompressor;
+
+ private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
+
+ private volatile Throwable shuffleError;
+
+ // TODO NEWTEZ Add counters.
+
+ public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = conf;
+ this.numInputs = numInputs;
+
+ this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
+ this.inputManager = new BroadcastInputManager(inputContext, conf);
+
+ pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+ completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+ completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+ knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+ pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
+ obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
+
+ int maxConfiguredFetchers =
+ conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+
+ this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+
+ this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
+ .build());
+ this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+
+ this.startTime = System.currentTimeMillis();
+ this.lastProgressTime = startTime;
+
+ this.shuffleSecret = ShuffleUtils
+ .getJobTokenSecretFromTokenBytes(inputContext
+ .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+
+ this.connectionTimeout = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
+ this.readTimeout = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ decompressor = CodecPool.getDecompressor(codec);
+ } else {
+ codec = null;
+ decompressor = null;
+ }
+ }
+
+ public void run() {
+ RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
+ runShuffleFuture = new FutureTask<Void>(callable);
+ new Thread(runShuffleFuture, "ShuffleRunner");
+ }
+
+ private class RunBroadcastShuffleCallable implements Callable<Void> {
+
+ @Override
+ public Void call() throws Exception {
+ while (numCompletedInputs.get() < numInputs) {
+ if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+ synchronized(lock) {
+ wakeLoop.await();
+ }
+ if (shuffleError != null) {
+ // InputContext has already been informed of a fatal error.
+ // Initiate shutdown.
+ break;
+ }
+
+ if (numCompletedInputs.get() < numInputs) {
+ synchronized (lock) {
+ int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
+ int count = 0;
+ for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
+ InputHost inputHost = inputHostIter.next();
+ inputHostIter.remove();
+ if (inputHost.getNumPendingInputs() > 0) {
+ Fetcher fetcher = constructFetcherForHost(inputHost);
+ numRunningFetchers.incrementAndGet();
+ ListenableFuture<FetchResult> future = fetcherExecutor
+ .submit(fetcher);
+ Futures.addCallback(future, fetchFutureCallback);
+ if (++count >= numFetchersToRun) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ // TODO NEWTEZ Maybe clean up inputs.
+ if (!fetcherExecutor.isShutdown()) {
+ fetcherExecutor.shutdownNow();
+ }
+ return null;
+ }
+ }
+
+ private Fetcher constructFetcherForHost(InputHost inputHost) {
+ FetcherBuilder fetcherBuilder = new FetcherBuilder(
+ BroadcastShuffleManager.this, inputManager,
+ inputContext.getApplicationId(), shuffleSecret, conf);
+ fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
+ fetcherBuilder.setCompressionParameters(codec, decompressor);
+
+ // Remove obsolete inputs from the list being given to the fetcher. Also
+ // remove from the obsolete list.
+ List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+ .clearAndGetPendingInputs();
+ for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+ .iterator(); inputIter.hasNext();) {
+ InputAttemptIdentifier input = inputIter.next();
+ // Avoid adding attempts which have already completed.
+ if (completedInputSet.contains(input.getInputIdentifier())) {
+ inputIter.remove();
+ }
+ // Avoid adding attempts which have been marked as OBSOLETE
+ if (obsoletedInputs.contains(input)) {
+ inputIter.remove();
+ obsoletedInputs.remove(input);
+ }
+ }
+ // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+ // fetcher, especially in the case where #hosts < #fetchers
+ fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
+ inputHost.clearAndGetPendingInputs());
+ return fetcherBuilder.build();
+ }
+
+ /////////////////// Methods for InputEventHandler
+
+ public void addKnownInput(String hostName, int port,
+ InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+ InputHost host = knownSrcHosts.get(hostName);
+ if (host == null) {
+ host = new InputHost(hostName, port, inputContext.getApplicationId());
+ InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+ if (old != null) {
+ host = old;
+ }
+ }
+ host.addKnownInput(srcAttemptIdentifier);
+ synchronized(lock) {
+ pendingHosts.add(host);
+ wakeLoop.signal();
+ }
+ }
+
+ public void addCompletedInputWithNoData(
+ InputAttemptIdentifier srcAttemptIdentifier) {
+ InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+ if (pendingInputs.remove(inputIdentifier)) {
+ completedInputSet.add(inputIdentifier);
+ completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
+ numCompletedInputs.incrementAndGet();
+ }
+
+ // Awake the loop to check for termination.
+ synchronized (lock) {
+ wakeLoop.signal();
+ }
+ }
+
+ public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+ obsoletedInputs.add(srcAttemptIdentifier);
+ // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
+ }
+
+
+ public void handleEvents(List<Event> events) {
+ inputEventHandler.handleEvents(events);
+ }
+
+ /////////////////// End of Methods for InputEventHandler
+ /////////////////// Methods from FetcherCallbackHandler
+
+ @Override
+ public void fetchSucceeded(String host,
+ InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
+ long copyDuration) throws IOException {
+ InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+ }
+
+ // Count irrespective of whether this is a copy of an already fetched input
+ synchronized(lock) {
+ lastProgressTime = System.currentTimeMillis();
+ }
+
+ boolean committed = false;
+ if (!completedInputSet.contains(inputIdentifier)) {
+ synchronized (completedInputSet) {
+ if (!completedInputSet.contains(inputIdentifier)) {
+ fetchedInput.commit();
+ committed = true;
+ pendingInputs.remove(inputIdentifier);
+ completedInputSet.add(inputIdentifier);
+ completedInputs.add(fetchedInput);
+ numCompletedInputs.incrementAndGet();
+ }
+ }
+ }
+ if (!committed) {
+ fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+ } else {
+ synchronized(lock) {
+ // Signal the wakeLoop to check for termination.
+ wakeLoop.signal();
+ }
+ }
+ // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
+ }
+
+ @Override
+ public void fetchFailed(String host,
+ InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+ // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+ // For now, reporting immediately.
+ InputReadErrorEvent readError = new InputReadErrorEvent(
+ "Fetch failure while fetching from "
+ + TezRuntimeUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(),
+ srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcAttemptIdentifier.getAttemptNumber()),
+ srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcAttemptIdentifier.getAttemptNumber());
+
+ List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+ failedEvents.add(readError);
+ inputContext.sendEvents(failedEvents);
+ }
+ /////////////////// End of Methods from FetcherCallbackHandler
+
+ public void shutdown() throws InterruptedException {
+ if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+ this.fetcherExecutor.shutdown();
+ this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
+ if (!this.fetcherExecutor.isShutdown()) {
+ this.fetcherExecutor.shutdownNow();
+ }
+ }
+ }
+
+ /////////////////// Methods for walking the available inputs
+
+ /**
+ * @return true if there is another input ready for consumption.
+ */
+ public boolean newInputAvailable() {
+ FetchedInput head = completedInputs.peek();
+ if (head == null || head instanceof NullFetchedInput) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * @return true if all of the required inputs have been fetched.
+ */
+ public boolean allInputsFetched() {
+ return numCompletedInputs.get() == numInputs;
+ }
+
+ /**
+ * @return the next available input, or null if there are no available inputs.
+ * This method will block if there are currently no available inputs,
+ * but more may become available.
+ */
+ public FetchedInput getNextInput() throws InterruptedException {
+ FetchedInput input = null;
+ do {
+ input = completedInputs.peek();
+ if (input == null) {
+ if (allInputsFetched()) {
+ break;
+ } else {
+ input = completedInputs.take(); // block
+ }
+ } else {
+ input = completedInputs.poll();
+ }
+ } while (input instanceof NullFetchedInput);
+ return input;
+ }
+
+ /////////////////// End of methods for walking the available inputs
+
+
+ /**
+ * Fake input that is added to the completed input list in case an input does not have any data.
+ *
+ */
+ private class NullFetchedInput extends FetchedInput {
+
+ public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+ super(Type.MEMORY, -1, inputAttemptIdentifier, null);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public void commit() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public void abort() throws IOException {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+
+ @Override
+ public void free() {
+ throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+ }
+ }
+
+
+ private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+ private void doBookKeepingForFetcherComplete() {
+ numRunningFetchers.decrementAndGet();
+ synchronized(lock) {
+ wakeLoop.signal();
+ }
+ }
+
+ @Override
+ public void onSuccess(FetchResult result) {
+ Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+ if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+ InputHost inputHost = knownSrcHosts.get(result.getHost());
+ assert inputHost != null;
+ for (InputAttemptIdentifier input : pendingInputs) {
+ inputHost.addKnownInput(input);
+ }
+ pendingHosts.add(inputHost);
+ }
+ doBookKeepingForFetcherComplete();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Fetcher failed with error: " + t);
+ shuffleError = t;
+ inputContext.fatalError(t, "Fetched failed");
+ doBookKeepingForFetcherComplete();
+ }
+ }
+}