You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/25 23:21:23 UTC

git commit: TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 2cee9acaa -> 51c8a8b71


TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51c8a8b7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51c8a8b7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51c8a8b7

Branch: refs/heads/master
Commit: 51c8a8b7187af3791f6985d046b428fcb30666d6
Parents: 2cee9ac
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 25 14:19:42 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 25 14:20:23 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/client/TezClientUtils.java   | 10 ++++
 .../main/java/org/apache/tez/dag/api/DAG.java   |  1 -
 .../apache/tez/dag/api/DataSinkDescriptor.java  | 48 ++++++++++++++++++++
 .../tez/dag/api/DataSourceDescriptor.java       | 44 ++++++++++++++++++
 .../java/org/apache/tez/dag/api/Vertex.java     |  7 ++-
 .../org/apache/tez/mapreduce/input/MRInput.java | 30 +++++++-----
 .../apache/tez/mapreduce/output/MROutput.java   | 16 ++++---
 8 files changed, 136 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 44a450a..d7fa961 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@ ALL CHANGES:
   TEZ-1613. Decrease running time for TestAMRecovery
   TEZ-1240. Add system test for propagation of diagnostics for errors
   TEZ-1618. LocalTaskSchedulerService.getTotalResources() and getAvailableResources() can get negative if JVM memory is larger than 2GB
+  TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 1e01138..f011f60 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -83,6 +83,8 @@ import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -356,6 +358,14 @@ public class TezClientUtils {
         for (LocalResource lr: v.getTaskLocalFiles().values()) {
           lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
         }
+        List<DataSourceDescriptor> dataSources = v.getDataSources();
+        for (DataSourceDescriptor dataSource : dataSources) {
+          addFileSystemCredentialsFromURIs(dataSource.getURIsForCredentials(), dagCredentials, conf);
+        }
+        List<DataSinkDescriptor> dataSinks = v.getDataSinks();
+        for (DataSinkDescriptor dataSink : dataSinks) {
+          addFileSystemCredentialsFromURIs(dataSink.getURIsForCredentials(), dagCredentials, conf);
+        }
       }
       
       for (LocalResource lr: dag.getTaskLocalFiles().values()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c28f210..9b428f0 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -640,7 +640,6 @@ public class DAG {
         if (dataSource.getCredentials() != null) {
           dagCredentials.addAll(dataSource.getCredentials());
         }
-        vertex.addTaskLocalFiles(dataSource.getAdditionalLocalFiles());
         if (dataSource.getAdditionalLocalFiles() != null) {
           TezCommonUtils.addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
index bc43c88..4d0d615 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
@@ -18,11 +18,18 @@
 
 package org.apache.tez.dag.api;
 
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.security.Credentials;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
 /**
  * Defines the output and output committer for a data sink 
  *
@@ -33,6 +40,7 @@ public class DataSinkDescriptor {
   private final OutputCommitterDescriptor committerDescriptor;
   
   private final Credentials credentials;
+  private final Collection<URI> urisForCredentials = Sets.newHashSet();
 
   /**
    * Create a {@link DataSinkDescriptor}
@@ -83,14 +91,54 @@ public class DataSinkDescriptor {
     return new DataSinkDescriptor(outputDescriptor, committerDescriptor, credentials);
   }
   
+  /**
+   * Get the {@link OutputDescriptor} for this {@link DataSinkDescriptor}
+   * @return {@link OutputDescriptor}
+   */
   public OutputDescriptor getOutputDescriptor() {
     return outputDescriptor;
   }
   
+  /**
+   * Get the {@link OutputCommitterDescriptor} for this {@link DataSinkDescriptor}
+   * @return {@link OutputCommitterDescriptor}
+   */
   public @Nullable OutputCommitterDescriptor getOutputCommitterDescriptor() {
     return committerDescriptor;
   }
   
+  /** 
+  * This method can be used to specify a list of URIs for which Credentials
+  * need to be obtained so that the job can run. An incremental list of URIs
+  * can be provided by making multiple calls to the method.
+  * 
+  * Currently, @{link credentials} can only be fetched for HDFS and other
+  * {@link org.apache.hadoop.fs.FileSystem} implementations that support
+  * credentials.
+  * 
+  * @param uris
+  *          a list of {@link URI}s
+  * @return this
+  */
+  public synchronized DataSinkDescriptor addURIsForCredentials(Collection<URI> uris) {
+    Preconditions.checkNotNull(uris, "URIs cannot be null");
+    urisForCredentials.addAll(uris);
+    return this;
+  }
+  
+  /**
+   * Get the URIs for which credentials will be obtained
+   * @return an unmodifiable list representing the URIs for which credentials
+   *         are required.
+   */
+  public Collection<URI> getURIsForCredentials() {
+    return Collections.unmodifiableCollection(urisForCredentials);
+  }
+
+  /**
+   * Get the {@link Credentials} for this {@link DataSinkDescriptor}
+   * @return {@link Credentials}
+   */
   public @Nullable Credentials getCredentials() {
     return credentials;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
index 78ddefa..1c5c16d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
@@ -20,9 +20,13 @@ package org.apache.tez.dag.api;
 
 import javax.annotation.Nullable;
 
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -30,6 +34,9 @@ import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
 /**
  * Defines the input and input initializer for a data source 
  *
@@ -43,6 +50,7 @@ public class DataSourceDescriptor {
   private final int numShards;
   private final VertexLocationHint locationHint;
   private final Map<String, LocalResource> additionalLocalFiles;
+  private final Collection<URI> urisForCredentials = Sets.newHashSet();
 
   private DataSourceDescriptor(InputDescriptor inputDescriptor,
                                @Nullable InputInitializerDescriptor initializerDescriptor,
@@ -119,14 +127,50 @@ public class DataSourceDescriptor {
         locationHint, additionalLocalFiles);
   }
 
+  /**
+   * Get the {@link InputDescriptor} for this {@link DataSourceDescriptor} 
+   * @return {@link InputDescriptor}
+   */
   public InputDescriptor getInputDescriptor() {
     return inputDescriptor;
   }
   
+  /**
+   * Get the {@link InputInitializerDescriptor} for this {@link DataSourceDescriptor}
+   * @return {@link InputInitializerDescriptor}
+   */
   public @Nullable InputInitializerDescriptor getInputInitializerDescriptor() {
     return initializerDescriptor;
   }
   
+  /** 
+  * This method can be used to specify a list of URIs for which Credentials
+  * need to be obtained so that the job can run. An incremental list of URIs
+  * can be provided by making multiple calls to the method.
+  * 
+  * Currently, @{link credentials} can only be fetched for HDFS and other
+  * {@link org.apache.hadoop.fs.FileSystem} implementations that support
+  * credentials.
+  * 
+  * @param uris
+  *          a list of {@link URI}s
+  * @return this
+  */
+  public synchronized DataSourceDescriptor addURIsForCredentials(Collection<URI> uris) {
+    Preconditions.checkNotNull(uris, "URIs cannot be null");
+    urisForCredentials.addAll(uris);
+    return this;
+  }
+  
+  /**
+   * Get the URIs for which credentials will be obtained
+   * @return an unmodifiable list representing the URIs for which credentials
+   *         are required.
+   */
+  public Collection<URI> getURIsForCredentials() {
+    return Collections.unmodifiableCollection(urisForCredentials);
+  }
+  
   /**
    * Number of shards for this data source. If a vertex has only one
    * data source this the number of tasks in the vertex should be set to 

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 0dffbef..04acdaf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -440,11 +441,13 @@ public class Vertex {
     this.taskResource = resource;
   }
 
-  List<DataSourceDescriptor> getDataSources() {
+  @Private
+  public List<DataSourceDescriptor> getDataSources() {
     return dataSources;
   }
   
-  List<DataSinkDescriptor> getDataSinks() {
+  @Private
+  public List<DataSinkDescriptor> getDataSinks() {
     return dataSinks;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index ddb001c..f38fc9c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -39,7 +40,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.Credentials;
-import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -267,7 +267,7 @@ public class MRInput extends MRInputBase {
 
       MRHelpers.translateMRConfToTez(conf);
 
-      Credentials credentials = maybeGetCredentials();
+      Collection<URI> uris = maybeGetURIsForCredentials();
 
       UserPayload payload = null;
       if (groupSplitsInAM) {
@@ -276,16 +276,20 @@ public class MRInput extends MRInputBase {
         payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
       }
 
-      return DataSourceDescriptor
+      DataSourceDescriptor ds = DataSourceDescriptor
           .create(InputDescriptor.create(inputClassName).setUserPayload(payload),
-              customInitializerDescriptor, credentials);
+              customInitializerDescriptor, null);
+      if (uris != null) {
+        ds.addURIsForCredentials(uris);
+      }
+      return ds;
     }
 
     private DataSourceDescriptor createGeneratorDataSource() throws IOException {
       setupBasicConf(conf);
       MRHelpers.translateMRConfToTez(conf);
       
-      Credentials credentials = maybeGetCredentials();
+      Collection<URI> uris = maybeGetURIsForCredentials();
 
       UserPayload payload = null;
       if (groupSplitsInAM) {
@@ -293,9 +297,13 @@ public class MRInput extends MRInputBase {
       } else {
         payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
       }
-      return DataSourceDescriptor.create(
+      DataSourceDescriptor ds = DataSourceDescriptor.create(
           InputDescriptor.create(inputClassName).setUserPayload(payload),
-          InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), credentials);
+          InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), null);
+      if (uris != null) {
+        ds.addURIsForCredentials(uris);
+      }
+      return ds;
     }
 
     private void setupBasicConf(Configuration inputConf) {
@@ -309,8 +317,7 @@ public class MRInput extends MRInputBase {
       }
     }
 
-    private Credentials maybeGetCredentials() {
-      Credentials credentials = null;
+    private Collection<URI> maybeGetURIsForCredentials() {
       if (getCredentialsForSourceFilesystem && inputPaths != null) {
         try {
           List<URI> uris = Lists.newLinkedList();
@@ -321,13 +328,12 @@ public class MRInput extends MRInputBase {
             Path qPath = fs.makeQualified(path);
             uris.add(qPath.toUri());
           }
-          credentials = new Credentials();
-          TezClientUtils.addFileSystemCredentialsFromURIs(uris, credentials, conf);
+          return uris;
         } catch (IOException e) {
           throw new TezUncheckedException(e);
         }
       }
-      return credentials;
+      return null;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index ab9b41d..421fc8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -21,7 +21,9 @@ package org.apache.tez.mapreduce.output;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.net.URI;
 import java.text.NumberFormat;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -164,25 +166,27 @@ public class MROutput extends AbstractLogicalOutput {
                   FileOutputFormat.class.getName());
         }
       }
-      Credentials credentials = null;
+      Collection<URI> uris = null;
       if (getCredentialsForSinkFilesystem && outputPath != null) {
         try {
           Path path = new Path(outputPath);
           FileSystem fs;
           fs = path.getFileSystem(conf);
           Path qPath = fs.makeQualified(path);
-          credentials = new Credentials();
-          TezClientUtils.addFileSystemCredentialsFromURIs(Collections.singletonList(qPath.toUri()),
-              credentials, conf);
+          uris = Collections.singletonList(qPath.toUri());
         } catch (IOException e) {
           throw new TezUncheckedException(e);
         }
       }
 
-      return DataSinkDescriptor.create(
+      DataSinkDescriptor ds = DataSinkDescriptor.create(
           OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
           (doCommit ? OutputCommitterDescriptor.create(
-              MROutputCommitter.class.getName()) : null), credentials);
+              MROutputCommitter.class.getName()) : null), null);
+      if (uris != null) {
+        ds.addURIsForCredentials(uris);
+      }
+      return ds;
     }
     
     /**