You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/25 23:18:45 UTC

tez git commit: TEZ-2740. Create a reconfigureVertex alias for deprecated setVertexParallelism API (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 24e17a4e5 -> a0c0727dd


TEZ-2740. Create a reconfigureVertex alias for deprecated setVertexParallelism API (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a0c0727d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a0c0727d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a0c0727d

Branch: refs/heads/master
Commit: a0c0727dda85841417326c319a1bc29fd22c88ab
Parents: 24e17a4
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Aug 25 14:18:44 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Aug 25 14:18:44 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../tez/dag/api/VertexManagerPluginContext.java | 37 ++++++++++++++++++++
 .../tez/dag/app/dag/impl/VertexManager.java     | 13 +++++++
 3 files changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a0c0727d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d2c39e9..ff75713 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2740. Create a reconfigureVertex alias for deprecated
+  setVertexParallelism API
   TEZ-2690. Add critical path analyser
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
   TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers

http://git-wip-us.apache.org/repos/asf/tez/blob/a0c0727d/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 883387b..242bcee 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -167,6 +167,43 @@ public interface VertexManagerPluginContext {
       @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);
 
+  /**
+   * API to reconfigure a {@link Vertex} that is reading root inputs based on
+   * the data read from the root inputs. Root inputs are external data sources
+   * that provide the initial data for the DAG and are added to the
+   * {@link Vertex} using the
+   * {@link Vertex#addDataSource(String, DataSourceDescriptor)} API. Typically,
+   * the parallelism of such vertices is determined at runtime by gathering
+   * information about the data source. This API may be used to set the
+   * parallelism of the vertex at runtime based on the data sources, as well as
+   * changing the specification for those inputs. In addition, changing
+   * parallelism is often accompanied by changing the {@link EdgeProperty} of
+   * the source {@link Edge} because event routing between source and
+   * destination tasks may need to be updated to account for the new task
+   * parallelism. This method can be called to update the parallelism multiple
+   * times until any of the tasks of the vertex have been scheduled (by invoking
+   * {@link #scheduleTasks(List)}. If needed, the original source edge
+   * properties may be obtained via {@link #getInputVertexEdgeProperties()}
+   * 
+   * @param parallelism
+   *          New number of tasks in the vertex
+   * @param locationHint
+   *          the placement policy for tasks specified at
+   *          {@link VertexLocationHint}s
+   * @param sourceEdgeProperties
+   *          Map with Key=name of {@link Edge} to be updated and Value=
+   *          {@link EdgeProperty}. The name of the Edge will be the
+   *          corresponding source vertex name.
+   * @param rootInputSpecUpdate
+   *          The key of the map is the name of the data source and the value is
+   *          the updated {@link InputSpecUpdate} for that data source. If none
+   *          specified, a default value is used. See {@link InputSpecUpdate}
+   *          for details.
+   */
+  public void reconfigureVertex(int parallelism,
+      @Nullable VertexLocationHint locationHint,
+      @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties,
+      @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);
   
   /**
    * API to reconfigure a {@link Vertex} by changing its task parallelism. Task

http://git-wip-us.apache.org/repos/asf/tez/blob/a0c0727d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 247b92f..bb512a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -173,6 +173,19 @@ public class VertexManager {
     }
     
     @Override
+    public synchronized void reconfigureVertex(int parallelism, VertexLocationHint vertexLocationHint,
+        Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties,
+        Map<String, InputSpecUpdate> rootInputSpecUpdate) {
+      checkAndThrowIfDone();
+      try {
+        managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties,
+            rootInputSpecUpdate, true);
+      } catch (AMUserCodeException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+    
+    @Override
     public synchronized void reconfigureVertex(int parallelism,
         @Nullable VertexLocationHint locationHint,
         @Nullable Map<String, EdgeProperty> sourceEdgeProperties) {