You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/25 23:21:23 UTC
git commit: TEZ-1611. Change DataSource/Sink to be able to supply
URIs for credentials (bikas)
Repository: tez
Updated Branches:
refs/heads/master 2cee9acaa -> 51c8a8b71
TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51c8a8b7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51c8a8b7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51c8a8b7
Branch: refs/heads/master
Commit: 51c8a8b7187af3791f6985d046b428fcb30666d6
Parents: 2cee9ac
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 25 14:19:42 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 25 14:20:23 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/client/TezClientUtils.java | 10 ++++
.../main/java/org/apache/tez/dag/api/DAG.java | 1 -
.../apache/tez/dag/api/DataSinkDescriptor.java | 48 ++++++++++++++++++++
.../tez/dag/api/DataSourceDescriptor.java | 44 ++++++++++++++++++
.../java/org/apache/tez/dag/api/Vertex.java | 7 ++-
.../org/apache/tez/mapreduce/input/MRInput.java | 30 +++++++-----
.../apache/tez/mapreduce/output/MROutput.java | 16 ++++---
8 files changed, 136 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 44a450a..d7fa961 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@ ALL CHANGES:
TEZ-1613. Decrease running time for TestAMRecovery
TEZ-1240. Add system test for propagation of diagnostics for errors
TEZ-1618. LocalTaskSchedulerService.getTotalResources() and getAvailableResources() can get negative if JVM memory is larger than 2GB
+ TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 1e01138..f011f60 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -83,6 +83,8 @@ import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@@ -356,6 +358,14 @@ public class TezClientUtils {
for (LocalResource lr: v.getTaskLocalFiles().values()) {
lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
}
+ List<DataSourceDescriptor> dataSources = v.getDataSources();
+ for (DataSourceDescriptor dataSource : dataSources) {
+ addFileSystemCredentialsFromURIs(dataSource.getURIsForCredentials(), dagCredentials, conf);
+ }
+ List<DataSinkDescriptor> dataSinks = v.getDataSinks();
+ for (DataSinkDescriptor dataSink : dataSinks) {
+ addFileSystemCredentialsFromURIs(dataSink.getURIsForCredentials(), dagCredentials, conf);
+ }
}
for (LocalResource lr: dag.getTaskLocalFiles().values()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c28f210..9b428f0 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -640,7 +640,6 @@ public class DAG {
if (dataSource.getCredentials() != null) {
dagCredentials.addAll(dataSource.getCredentials());
}
- vertex.addTaskLocalFiles(dataSource.getAdditionalLocalFiles());
if (dataSource.getAdditionalLocalFiles() != null) {
TezCommonUtils.addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
index bc43c88..4d0d615 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
@@ -18,11 +18,18 @@
package org.apache.tez.dag.api;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+
import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.security.Credentials;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
/**
* Defines the output and output committer for a data sink
*
@@ -33,6 +40,7 @@ public class DataSinkDescriptor {
private final OutputCommitterDescriptor committerDescriptor;
private final Credentials credentials;
+ private final Collection<URI> urisForCredentials = Sets.newHashSet();
/**
* Create a {@link DataSinkDescriptor}
@@ -83,14 +91,54 @@ public class DataSinkDescriptor {
return new DataSinkDescriptor(outputDescriptor, committerDescriptor, credentials);
}
+ /**
+ * Get the {@link OutputDescriptor} for this {@link DataSinkDescriptor}
+ * @return {@link OutputDescriptor}
+ */
public OutputDescriptor getOutputDescriptor() {
return outputDescriptor;
}
+ /**
+ * Get the {@link OutputCommitterDescriptor} for this {@link DataSinkDescriptor}
+ * @return {@link OutputCommitterDescriptor}
+ */
public @Nullable OutputCommitterDescriptor getOutputCommitterDescriptor() {
return committerDescriptor;
}
+ /**
+ * This method can be used to specify a list of URIs for which Credentials
+ * need to be obtained so that the job can run. An incremental list of URIs
+ * can be provided by making multiple calls to the method.
+ *
+ * Currently, @{link credentials} can only be fetched for HDFS and other
+ * {@link org.apache.hadoop.fs.FileSystem} implementations that support
+ * credentials.
+ *
+ * @param uris
+ * a list of {@link URI}s
+ * @return this
+ */
+ public synchronized DataSinkDescriptor addURIsForCredentials(Collection<URI> uris) {
+ Preconditions.checkNotNull(uris, "URIs cannot be null");
+ urisForCredentials.addAll(uris);
+ return this;
+ }
+
+ /**
+ * Get the URIs for which credentials will be obtained
+ * @return an unmodifiable list representing the URIs for which credentials
+ * are required.
+ */
+ public Collection<URI> getURIsForCredentials() {
+ return Collections.unmodifiableCollection(urisForCredentials);
+ }
+
+ /**
+ * Get the {@link Credentials} for this {@link DataSinkDescriptor}
+ * @return {@link Credentials}
+ */
public @Nullable Credentials getCredentials() {
return credentials;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
index 78ddefa..1c5c16d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
@@ -20,9 +20,13 @@ package org.apache.tez.dag.api;
import javax.annotation.Nullable;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -30,6 +34,9 @@ import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
/**
* Defines the input and input initializer for a data source
*
@@ -43,6 +50,7 @@ public class DataSourceDescriptor {
private final int numShards;
private final VertexLocationHint locationHint;
private final Map<String, LocalResource> additionalLocalFiles;
+ private final Collection<URI> urisForCredentials = Sets.newHashSet();
private DataSourceDescriptor(InputDescriptor inputDescriptor,
@Nullable InputInitializerDescriptor initializerDescriptor,
@@ -119,14 +127,50 @@ public class DataSourceDescriptor {
locationHint, additionalLocalFiles);
}
+ /**
+ * Get the {@link InputDescriptor} for this {@link DataSourceDescriptor}
+ * @return {@link InputDescriptor}
+ */
public InputDescriptor getInputDescriptor() {
return inputDescriptor;
}
+ /**
+ * Get the {@link InputInitializerDescriptor} for this {@link DataSourceDescriptor}
+ * @return {@link InputInitializerDescriptor}
+ */
public @Nullable InputInitializerDescriptor getInputInitializerDescriptor() {
return initializerDescriptor;
}
+ /**
+ * This method can be used to specify a list of URIs for which Credentials
+ * need to be obtained so that the job can run. An incremental list of URIs
+ * can be provided by making multiple calls to the method.
+ *
+ * Currently, @{link credentials} can only be fetched for HDFS and other
+ * {@link org.apache.hadoop.fs.FileSystem} implementations that support
+ * credentials.
+ *
+ * @param uris
+ * a list of {@link URI}s
+ * @return this
+ */
+ public synchronized DataSourceDescriptor addURIsForCredentials(Collection<URI> uris) {
+ Preconditions.checkNotNull(uris, "URIs cannot be null");
+ urisForCredentials.addAll(uris);
+ return this;
+ }
+
+ /**
+ * Get the URIs for which credentials will be obtained
+ * @return an unmodifiable list representing the URIs for which credentials
+ * are required.
+ */
+ public Collection<URI> getURIsForCredentials() {
+ return Collections.unmodifiableCollection(urisForCredentials);
+ }
+
/**
* Number of shards for this data source. If a vertex has only one
* data source this the number of tasks in the vertex should be set to
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 0dffbef..04acdaf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -440,11 +441,13 @@ public class Vertex {
this.taskResource = resource;
}
- List<DataSourceDescriptor> getDataSources() {
+ @Private
+ public List<DataSourceDescriptor> getDataSources() {
return dataSources;
}
- List<DataSinkDescriptor> getDataSinks() {
+ @Private
+ public List<DataSinkDescriptor> getDataSinks() {
return dataSinks;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index ddb001c..f38fc9c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -39,7 +40,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.security.Credentials;
-import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
@@ -267,7 +267,7 @@ public class MRInput extends MRInputBase {
MRHelpers.translateMRConfToTez(conf);
- Credentials credentials = maybeGetCredentials();
+ Collection<URI> uris = maybeGetURIsForCredentials();
UserPayload payload = null;
if (groupSplitsInAM) {
@@ -276,16 +276,20 @@ public class MRInput extends MRInputBase {
payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
}
- return DataSourceDescriptor
+ DataSourceDescriptor ds = DataSourceDescriptor
.create(InputDescriptor.create(inputClassName).setUserPayload(payload),
- customInitializerDescriptor, credentials);
+ customInitializerDescriptor, null);
+ if (uris != null) {
+ ds.addURIsForCredentials(uris);
+ }
+ return ds;
}
private DataSourceDescriptor createGeneratorDataSource() throws IOException {
setupBasicConf(conf);
MRHelpers.translateMRConfToTez(conf);
- Credentials credentials = maybeGetCredentials();
+ Collection<URI> uris = maybeGetURIsForCredentials();
UserPayload payload = null;
if (groupSplitsInAM) {
@@ -293,9 +297,13 @@ public class MRInput extends MRInputBase {
} else {
payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
}
- return DataSourceDescriptor.create(
+ DataSourceDescriptor ds = DataSourceDescriptor.create(
InputDescriptor.create(inputClassName).setUserPayload(payload),
- InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), credentials);
+ InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), null);
+ if (uris != null) {
+ ds.addURIsForCredentials(uris);
+ }
+ return ds;
}
private void setupBasicConf(Configuration inputConf) {
@@ -309,8 +317,7 @@ public class MRInput extends MRInputBase {
}
}
- private Credentials maybeGetCredentials() {
- Credentials credentials = null;
+ private Collection<URI> maybeGetURIsForCredentials() {
if (getCredentialsForSourceFilesystem && inputPaths != null) {
try {
List<URI> uris = Lists.newLinkedList();
@@ -321,13 +328,12 @@ public class MRInput extends MRInputBase {
Path qPath = fs.makeQualified(path);
uris.add(qPath.toUri());
}
- credentials = new Credentials();
- TezClientUtils.addFileSystemCredentialsFromURIs(uris, credentials, conf);
+ return uris;
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
- return credentials;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/51c8a8b7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index ab9b41d..421fc8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -21,7 +21,9 @@ package org.apache.tez.mapreduce.output;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.net.URI;
import java.text.NumberFormat;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -164,25 +166,27 @@ public class MROutput extends AbstractLogicalOutput {
FileOutputFormat.class.getName());
}
}
- Credentials credentials = null;
+ Collection<URI> uris = null;
if (getCredentialsForSinkFilesystem && outputPath != null) {
try {
Path path = new Path(outputPath);
FileSystem fs;
fs = path.getFileSystem(conf);
Path qPath = fs.makeQualified(path);
- credentials = new Credentials();
- TezClientUtils.addFileSystemCredentialsFromURIs(Collections.singletonList(qPath.toUri()),
- credentials, conf);
+ uris = Collections.singletonList(qPath.toUri());
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
- return DataSinkDescriptor.create(
+ DataSinkDescriptor ds = DataSinkDescriptor.create(
OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
(doCommit ? OutputCommitterDescriptor.create(
- MROutputCommitter.class.getName()) : null), credentials);
+ MROutputCommitter.class.getName()) : null), null);
+ if (uris != null) {
+ ds.addURIsForCredentials(uris);
+ }
+ return ds;
}
/**