You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ka...@apache.org on 2013/04/15 00:34:52 UTC

git commit: SQOOP-895: Sqoop2: Do not serialize framework and connector configurations into mapreduce configuration object

Updated Branches:
  refs/heads/sqoop2 c4ddeb7ff -> 0a0a65a29


SQOOP-895: Sqoop2: Do not serialize framework and connector configurations into mapreduce configuration object

(Jarek Jarcec Cecho via Kate Ting)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0a0a65a2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0a0a65a2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0a0a65a2

Branch: refs/heads/sqoop2
Commit: 0a0a65a29fcd01251e74e63e8963f8c250e12668
Parents: c4ddeb7
Author: Kate Ting <ka...@apache.org>
Authored: Sun Apr 14 18:27:58 2013 -0400
Committer: Kate Ting <ka...@apache.org>
Committed: Sun Apr 14 18:27:58 2013 -0400

----------------------------------------------------------------------
 .../java/org/apache/sqoop/job/JobConstants.java    |   13 ++++++
 .../apache/sqoop/job/mr/ConfigurationUtils.java    |   31 ++++++++-------
 .../mapreduce/MapreduceSubmissionEngine.java       |   23 ++++++-----
 3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/0a0a65a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index e16a2c4..e2b3ce8 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.job;
 
+import org.apache.hadoop.io.Text;
 import org.apache.sqoop.core.ConfigurationConstants;
 
 public final class JobConstants extends Constants {
@@ -67,15 +68,27 @@ public final class JobConstants extends Constants {
   public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
     PREFIX_JOB_CONFIG + "config.connector.connection";
 
+  public static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY =
+    new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
+
   public static final String JOB_CONFIG_CONNECTOR_JOB =
     PREFIX_JOB_CONFIG + "config.connector.job";
 
+  public static final Text JOB_CONFIG_CONNECTOR_JOB_KEY =
+    new Text(JOB_CONFIG_CONNECTOR_JOB);
+
   public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
     PREFIX_JOB_CONFIG + "config.framework.connection";
 
+  public static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY =
+    new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
+
   public static final String JOB_CONFIG_FRAMEWORK_JOB =
     PREFIX_JOB_CONFIG + "config.framework.job";
 
+  public static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY =
+    new Text(JOB_CONFIG_FRAMEWORK_JOB);
+
   public static final String PREFIX_CONNECTOR_CONTEXT =
     PREFIX_JOB_CONFIG + "connector.context.";
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0a0a65a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index 4aa2128..64ec437 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -18,6 +18,8 @@
 package org.apache.sqoop.job.mr;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MJob;
@@ -33,48 +35,49 @@ public final class ConfigurationUtils {
   }
 
   public static Object getConnectorConnection(Configuration configuration) {
-    return loadConfiguration(configuration,
+    return loadConfiguration((JobConf) configuration,
       JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
-      JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION);
+      JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
   }
 
   public static Object getConnectorJob(Configuration configuration) {
-    return loadConfiguration(configuration,
+    return loadConfiguration((JobConf) configuration,
       JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
-      JobConstants.JOB_CONFIG_CONNECTOR_JOB);
+      JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY);
   }
 
   public static Object getFrameworkConnection(Configuration configuration) {
-    return loadConfiguration(configuration,
+    return loadConfiguration((JobConf) configuration,
       JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
-      JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION);
+      JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
   }
 
   public static Object getFrameworkJob(Configuration configuration) {
-    return loadConfiguration(configuration,
+    return loadConfiguration((JobConf) configuration,
       JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
-      JobConstants.JOB_CONFIG_FRAMEWORK_JOB);
+      JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY);
   }
 
   /**
-   * Load configuration instance serialized in Hadoop configuration object
-   * @param configuration Hadoop configuration object associated with the job
+   * Load configuration instance serialized in Hadoop credentials cache.
+   *
+   * @param configuration JobConf object associated with the job
    * @param classProperty Property with stored configuration class name
    * @param valueProperty Property with stored JSON representation of the
    *                      configuration object
    * @return New instance with loaded data
    */
-  private static Object loadConfiguration(Configuration configuration,
-                                          String classProperty,
-                                          String valueProperty) {
+  private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) {
     // Create new instance of configuration class
     Object object = ClassUtils.instantiate(configuration.get(classProperty));
     if(object == null) {
       return null;
     }
 
+    String json = new String(configuration.getCredentials().getSecretKey(valueProperty));
+
     // Fill it with JSON data
-    FormUtils.fillValues(configuration.get(valueProperty), object);
+    FormUtils.fillValues(json, object);
 
     // And give it back
     return object;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0a0a65a2/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 77f30ea..001fb02 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.submission.mapreduce;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
@@ -26,6 +27,7 @@ import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
@@ -181,16 +183,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
       request.getConfigFrameworkJob().getClass().getName());
 
-    // And finally configuration data
-    configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION,
-      FormUtils.toJson(request.getConfigConnectorConnection()));
-    configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_JOB,
-      FormUtils.toJson(request.getConfigConnectorJob()));
-    configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION,
-      FormUtils.toJson(request.getConfigFrameworkConnection()));
-    configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
-      FormUtils.toJson(request.getConfigFrameworkConnection()));
-
     // Set up notification URL if it's available
     if(request.getNotificationUrl() != null) {
       configuration.set("job.end.notification.url", request.getNotificationUrl());
@@ -217,6 +209,17 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     try {
       Job job = new Job(configuration);
 
+      // And finally put all configuration objects to credentials cache
+      Credentials credentials = job.getCredentials();
+      credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY,
+        FormUtils.toJson(request.getConfigConnectorConnection()).getBytes());
+      credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY,
+        FormUtils.toJson(request.getConfigConnectorJob()).getBytes());
+      credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY,
+        FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
+      credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY,
+        FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
+
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());
       } else {