You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 04:51:52 UTC

[23/52] [abbrv] git commit: SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs

SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3d539dd4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3d539dd4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3d539dd4

Branch: refs/heads/SQOOP-1367
Commit: 3d539dd4d7477324dfe62a4e57f684351769b000
Parents: af25bcc
Author: Abraham Elmahrek <ab...@elmahrek.com>
Authored: Fri Sep 19 16:24:59 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:58:18 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/json/SubmissionBean.java   |  20 +-
 .../org/apache/sqoop/model/MSubmission.java     |  31 +-
 .../apache/sqoop/json/TestSubmissionBean.java   |  12 +-
 .../sqoop/connector/jdbc/TestToInitializer.java |   1 -
 .../idf/CSVIntermediateDataFormat.java          |   4 -
 .../connector/idf/IntermediateDataFormat.java   |   4 -
 .../idf/CSVIntermediateDataFormatTest.java      | 222 ------
 .../idf/TestCSVIntermediateDataFormat.java      | 222 ++++++
 .../sqoop/connector/ConnectorManager.java       |   5 +-
 .../apache/sqoop/framework/ExecutionEngine.java |  20 +-
 .../org/apache/sqoop/framework/JobManager.java  | 447 ++++++------
 .../org/apache/sqoop/framework/JobRequest.java  | 356 ++++++++++
 .../sqoop/framework/SubmissionEngine.java       |   7 +-
 .../sqoop/framework/SubmissionRequest.java      | 361 ----------
 .../sqoop/framework/TestFrameworkValidator.java | 182 +++--
 .../apache/sqoop/framework/TestJobManager.java  | 173 +++++
 .../apache/sqoop/framework/TestJobRequest.java  |  71 ++
 .../sqoop/framework/TestSubmissionRequest.java  |  71 --
 .../sqoop/repository/TestJdbcRepository.java    | 694 +++++++++----------
 .../sqoop/execution/mapreduce/MRJobRequest.java | 102 +++
 .../mapreduce/MRSubmissionRequest.java          | 102 ---
 .../mapreduce/MapreduceExecutionEngine.java     |  51 +-
 .../apache/sqoop/job/mr/ConfigurationUtils.java |   3 -
 .../apache/sqoop/job/mr/ProgressRunnable.java   |   4 +-
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    |   7 +-
 .../sqoop/job/mr/SqoopFileOutputFormat.java     |   4 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  50 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  28 +-
 .../sqoop/shell/utils/SubmissionDisplayer.java  |   8 +-
 .../org/apache/sqoop/job/etl/CallbackBase.java  |  49 --
 .../java/org/apache/sqoop/job/etl/From.java     |   2 +-
 .../main/java/org/apache/sqoop/job/etl/To.java  |   2 +-
 .../org/apache/sqoop/job/etl/Transferable.java  |  51 ++
 .../org/apache/sqoop/validation/Validator.java  |   1 -
 .../mapreduce/MapreduceSubmissionEngine.java    |  12 +-
 35 files changed, 1780 insertions(+), 1599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
index 61d6576..9b1ae74 100644
--- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
@@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean {
   private static final String EXCEPTION_TRACE = "exception-trace";
   private static final String PROGRESS = "progress";
   private static final String COUNTERS = "counters";
-  private static final String CONNECTOR_SCHEMA = "schema-connector";
-  private static final String HIO_SCHEMA = "schema-hio";
+  private static final String FROM_SCHEMA = "schema-from";
+  private static final String TO_SCHEMA = "schema-to";
 
   private List<MSubmission> submissions;
 
@@ -116,11 +116,11 @@ public class SubmissionBean implements JsonBean {
       if(submission.getCounters() != null) {
         object.put(COUNTERS, extractCounters(submission.getCounters()));
       }
-      if(submission.getConnectorSchema() != null)  {
-        object.put(CONNECTOR_SCHEMA, extractSchema(submission.getConnectorSchema()));
+      if(submission.getFromSchema() != null)  {
+        object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema()));
       }
-      if(submission.getHioSchema() != null) {
-        object.put(HIO_SCHEMA, extractSchema(submission.getHioSchema()));
+      if(submission.getToSchema() != null) {
+        object.put(TO_SCHEMA, extractSchema(submission.getToSchema()));
       }
 
       array.add(object);
@@ -188,11 +188,11 @@ public class SubmissionBean implements JsonBean {
       if(object.containsKey(COUNTERS)) {
         submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
       }
-      if(object.containsKey(CONNECTOR_SCHEMA)) {
-        submission.setConnectorSchema(restoreSchemna((JSONObject) object.get(CONNECTOR_SCHEMA)));
+      if(object.containsKey(FROM_SCHEMA)) {
+        submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA)));
       }
-      if(object.containsKey(HIO_SCHEMA)) {
-        submission.setHioSchema(restoreSchemna((JSONObject) object.get(HIO_SCHEMA)));
+      if(object.containsKey(TO_SCHEMA)) {
+        submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA)));
       }
 
       this.submissions.add(submission);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/main/java/org/apache/sqoop/model/MSubmission.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
index 1edd6ee..ca21135 100644
--- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java
+++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
@@ -100,20 +100,21 @@ public class MSubmission extends MAccountableEntity {
   String exceptionStackTrace;
 
   /**
-   * Schema that was reported by the connector.
+   * Schema for the FROM part of the job submission
    *
    * This property is required.
    */
-  Schema connectorSchema;
+  Schema fromSchema;
 
   /**
+   * Schema for the TO part of the job submission
    * Optional schema that reported by the underlying I/O implementation. Please
-   * note that this property might be empty and in such case the connector
-   * schema will use also on Hadoop I/O side.
+   * note that this property might be empty and in such case use the FROM schema
+   * on the TO side.
    *
    * This property is optional.
    */
-  Schema hioSchema;
+  Schema toSchema;
 
   public MSubmission() {
     status = SubmissionStatus.UNKNOWN;
@@ -219,20 +220,20 @@ public class MSubmission extends MAccountableEntity {
     this.setExceptionStackTrace(writer.toString());
   }
 
-  public Schema getConnectorSchema() {
-    return connectorSchema;
+  public Schema getFromSchema() {
+    return fromSchema;
   }
 
-  public void setConnectorSchema(Schema connectorSchema) {
-    this.connectorSchema = connectorSchema;
+  public void setFromSchema(Schema connectorSchema) {
+    this.fromSchema = connectorSchema;
   }
 
-  public Schema getHioSchema() {
-    return hioSchema;
+  public Schema getToSchema() {
+    return toSchema;
   }
 
-  public void setHioSchema(Schema hioSchema) {
-    this.hioSchema = hioSchema;
+  public void setToSchema(Schema hioSchema) {
+    this.toSchema = hioSchema;
   }
 
   @Override
@@ -248,8 +249,8 @@ public class MSubmission extends MAccountableEntity {
       ", externalLink='" + externalLink + '\'' +
       ", exceptionInfo='" + exceptionInfo + '\'' +
       ", exceptionStackTrace='" + exceptionStackTrace + '\'' +
-      ", connectorSchema='" + connectorSchema + '\'' +
-      ", hioSchema='" + hioSchema + '\'' +
+      ", fromSchema='" + fromSchema + '\'' +
+      ", toSchema='" + toSchema + '\'' +
       '}';
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
index d87655e..518c9cb 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
@@ -405,20 +405,20 @@ public class TestSubmissionBean extends TestCase {
     assertEquals(222222, counter.getValue());
   }
 
-  public void testTransferConnectorSchema() {
+  public void testTransferFromSchema() {
     MSubmission source = new MSubmission();
-    source.setConnectorSchema(getSchema());
+    source.setFromSchema(getSchema());
 
-    Schema target = transfer(source).getConnectorSchema();
+    Schema target = transfer(source).getFromSchema();
     assertNotNull(target);
     assertEquals(getSchema(), target);
   }
 
-  public void testTransferHioSchema() {
+  public void testTransferToSchema() {
     MSubmission source = new MSubmission();
-    source.setHioSchema(getSchema());
+    source.setToSchema(getSchema());
 
-    Schema target = transfer(source).getHioSchema();
+    Schema target = transfer(source).getToSchema();
     assertNotNull(target);
     assertEquals(getSchema(), target);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
index eb6fcf1..4767215 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
@@ -26,7 +26,6 @@ import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.ValidationResult;
 import org.apache.sqoop.validation.ValidationRunner;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 1e8ab52..df5cb9c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -21,7 +21,6 @@ package org.apache.sqoop.connector.idf;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.Column;
@@ -38,7 +37,6 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
@@ -46,8 +44,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   public static final char ESCAPE_CHARACTER = '\\';
   public static final char QUOTE_CHARACTER = '\'';
 
-  private static final Logger LOG = Logger.getLogger
-    (CSVIntermediateDataFormat.class);
 
   private static final char[] originals = {
     0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 91b594e..66d46a3 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -19,14 +19,10 @@
 package org.apache.sqoop.connector.idf;
 
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.Type;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Abstract class representing a pluggable intermediate data format the Sqoop

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
deleted file mode 100644
index df6d30f..0000000
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.connector.idf;
-
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Binary;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.Text;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class CSVIntermediateDataFormatTest {
-
-  private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
-
-  private IntermediateDataFormat<?> data;
-
-  @Before
-  public void setUp() {
-    data = new CSVIntermediateDataFormat();
-  }
-
-  private String getByteFieldString(byte[] byteFieldData) {
-    try {
-      return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
-    } catch(UnsupportedEncodingException e) {
-      // Should never get to this point because ISO-8859-1 is a standard codec.
-      return null;
-    }
-  }
-
-  @Test
-  public void testStringInStringOut() {
-    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
-      + ",'" + String.valueOf(0x0A) + "'";
-    data.setTextData(testData);
-    assertEquals(testData, data.getTextData());
-  }
-
-  @Test
-  public void testNullStringInObjectOut() {
-    Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1"))
-        .addColumn(new FixedPoint("2"))
-        .addColumn(new Text("3"))
-        .addColumn(new Text("4"))
-        .addColumn(new Binary("5"))
-        .addColumn(new Text("6"));
-    data.setSchema(schema);
-    data.setTextData(null);
-
-    Object[] out = data.getObjectData();
-
-    assertNull(out);
-  }
-
-  @Test(expected=SqoopException.class)
-  public void testEmptyStringInObjectOut() {
-    Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1"))
-        .addColumn(new FixedPoint("2"))
-        .addColumn(new Text("3"))
-        .addColumn(new Text("4"))
-        .addColumn(new Binary("5"))
-        .addColumn(new Text("6"));
-    data.setSchema(schema);
-    data.setTextData("");
-
-    data.getObjectData();
-  }
-
-  @Test
-  public void testStringInObjectOut() {
-
-    //byte[0] = -112, byte[1] = 54 - 2's complements
-    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
-      + ",'\\n'";
-    Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1"))
-        .addColumn(new FixedPoint("2"))
-        .addColumn(new Text("3"))
-        .addColumn(new Text("4"))
-        .addColumn(new Binary("5"))
-        .addColumn(new Text("6"));
-    data.setSchema(schema);
-    data.setTextData(testData);
-
-    Object[] out = data.getObjectData();
-
-    assertEquals(new Long(10),out[0]);
-    assertEquals(new Long(34),out[1]);
-    assertEquals("54",out[2]);
-    assertEquals("random data",out[3]);
-    assertEquals(-112, ((byte[])out[4])[0]);
-    assertEquals(54, ((byte[])out[4])[1]);
-    assertEquals("\n", out[5].toString());
-  }
-
-  @Test
-  public void testObjectInStringOut() {
-    Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1"))
-        .addColumn(new FixedPoint("2"))
-        .addColumn(new Text("3"))
-        .addColumn(new Text("4"))
-        .addColumn(new Binary("5"))
-        .addColumn(new Text("6"));
-    data.setSchema(schema);
-
-    byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
-    Object[] in = new Object[6];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-    in[2] = "54";
-    in[3] = "random data";
-    in[4] = byteFieldData;
-    in[5] = new String(new char[] { 0x0A });
-
-    data.setObjectData(in);
-
-    //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
-    String testData = "10,34,'54','random data'," +
-        getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
-    assertEquals(testData, data.getTextData());
-  }
-
-  @Test
-  public void testObjectInObjectOut() {
-    //Test escapable sequences too.
-    //byte[0] = -112, byte[1] = 54 - 2's complements
-    Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1"))
-        .addColumn(new FixedPoint("2"))
-        .addColumn(new Text("3"))
-        .addColumn(new Text("4"))
-        .addColumn(new Binary("5"))
-        .addColumn(new Text("6"));
-    data.setSchema(schema);
-
-    Object[] in = new Object[6];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-    in[2] = "54";
-    in[3] = "random data";
-    in[4] = new byte[] { (byte) -112, (byte) 54};
-    in[5] = new String(new char[] { 0x0A });
-    Object[] inCopy = new Object[6];
-    System.arraycopy(in,0,inCopy,0,in.length);
-
-    // Modifies the input array, so we use the copy to confirm
-    data.setObjectData(in);
-
-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
-  }
-
-  @Test
-  public void testStringFullRangeOfCharacters() {
-    Schema schema = new Schema("test");
-    schema.addColumn(new Text("1"));
-    data.setSchema(schema);
-
-    char[] allCharArr = new char[256];
-    for(int i = 0; i < allCharArr.length; ++i) {
-      allCharArr[i] = (char)i;
-    }
-    String strData = new String(allCharArr);
-
-    Object[] in = {strData};
-    Object[] inCopy = new Object[1];
-    System.arraycopy(in,0,inCopy,0,in.length);
-
-    // Modifies the input array, so we use the copy to confirm
-    data.setObjectData(in);
-
-    assertEquals(strData, data.getObjectData()[0]);
-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
-  }
-
-  @Test
-  public void testByteArrayFullRangeOfCharacters() {
-    Schema schema = new Schema("test");
-    schema.addColumn(new Binary("1"));
-    data.setSchema(schema);
-
-    byte[] allCharByteArr = new byte[256];
-    for(int i = 0; i < allCharByteArr.length; ++i) {
-      allCharByteArr[i] = (byte)i;
-    }
-
-    Object[] in = {allCharByteArr};
-    Object[] inCopy = new Object[1];
-    System.arraycopy(in,0,inCopy,0,in.length);
-
-    // Modifies the input array, so we use the copy to confirm
-    data.setObjectData(in);
-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
new file mode 100644
index 0000000..8c83a71
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestCSVIntermediateDataFormat {
+
+  private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
+
+  private IntermediateDataFormat<?> data;
+
+  @Before
+  public void setUp() {
+    data = new CSVIntermediateDataFormat();
+  }
+
+  private String getByteFieldString(byte[] byteFieldData) {
+    try {
+      return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
+    } catch(UnsupportedEncodingException e) {
+      // Should never get to this point because ISO-8859-1 is a standard codec.
+      return null;
+    }
+  }
+
+  @Test
+  public void testStringInStringOut() {
+    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+      + ",'" + String.valueOf(0x0A) + "'";
+    data.setTextData(testData);
+    assertEquals(testData, data.getTextData());
+  }
+
+  @Test
+  public void testNullStringInObjectOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+    data.setTextData(null);
+
+    Object[] out = data.getObjectData();
+
+    assertNull(out);
+  }
+
+  @Test(expected=SqoopException.class)
+  public void testEmptyStringInObjectOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+    data.setTextData("");
+
+    data.getObjectData();
+  }
+
+  @Test
+  public void testStringInObjectOut() {
+
+    //byte[0] = -112, byte[1] = 54 - 2's complements
+    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+      + ",'\\n'";
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+    data.setTextData(testData);
+
+    Object[] out = data.getObjectData();
+
+    assertEquals(new Long(10),out[0]);
+    assertEquals(new Long(34),out[1]);
+    assertEquals("54",out[2]);
+    assertEquals("random data",out[3]);
+    assertEquals(-112, ((byte[])out[4])[0]);
+    assertEquals(54, ((byte[])out[4])[1]);
+    assertEquals("\n", out[5].toString());
+  }
+
+  @Test
+  public void testObjectInStringOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+
+    byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
+    Object[] in = new Object[6];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+    in[2] = "54";
+    in[3] = "random data";
+    in[4] = byteFieldData;
+    in[5] = new String(new char[] { 0x0A });
+
+    data.setObjectData(in);
+
+    //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+    String testData = "10,34,'54','random data'," +
+        getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
+    assertEquals(testData, data.getTextData());
+  }
+
+  @Test
+  public void testObjectInObjectOut() {
+    //Test escapable sequences too.
+    //byte[0] = -112, byte[1] = 54 - 2's complements
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+
+    Object[] in = new Object[6];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+    in[2] = "54";
+    in[3] = "random data";
+    in[4] = new byte[] { (byte) -112, (byte) 54};
+    in[5] = new String(new char[] { 0x0A });
+    Object[] inCopy = new Object[6];
+    System.arraycopy(in,0,inCopy,0,in.length);
+
+    // Modifies the input array, so we use the copy to confirm
+    data.setObjectData(in);
+
+    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+  }
+
+  @Test
+  public void testStringFullRangeOfCharacters() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new Text("1"));
+    data.setSchema(schema);
+
+    char[] allCharArr = new char[256];
+    for(int i = 0; i < allCharArr.length; ++i) {
+      allCharArr[i] = (char)i;
+    }
+    String strData = new String(allCharArr);
+
+    Object[] in = {strData};
+    Object[] inCopy = new Object[1];
+    System.arraycopy(in,0,inCopy,0,in.length);
+
+    // Modifies the input array, so we use the copy to confirm
+    data.setObjectData(in);
+
+    assertEquals(strData, data.getObjectData()[0]);
+    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+  }
+
+  @Test
+  public void testByteArrayFullRangeOfCharacters() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new Binary("1"));
+    data.setSchema(schema);
+
+    byte[] allCharByteArr = new byte[256];
+    for(int i = 0; i < allCharByteArr.length; ++i) {
+      allCharByteArr[i] = (byte)i;
+    }
+
+    Object[] in = {allCharByteArr};
+    Object[] inCopy = new Object[1];
+    System.arraycopy(in,0,inCopy,0,in.length);
+
+    // Modifies the input array, so we use the copy to confirm
+    data.setObjectData(in);
+    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index db6f579..c87df84 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -114,10 +114,9 @@ public class ConnectorManager implements Reconfigurable {
     return bundles;
   }
 
-  public ResourceBundle getResourceBundle(long connectorId,
-                                                 Locale locale) {
+  public ResourceBundle getResourceBundle(long connectorId, Locale locale) {
     ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
-    return  handler.getConnector().getBundle(locale);
+    return handler.getConnector().getBundle(locale);
   }
 
   public MConnector getConnectorMetadata(long connectorId) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
index 96ec148..75b570d 100644
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -18,12 +18,11 @@
 package org.apache.sqoop.framework;
 
 import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MSubmission;
 
 /**
- * Execution engine drive execution of sqoop submission (job). It's responsible
+ * Execution engine drives execution of sqoop job. It's responsible
  * for executing all defined steps in the import/export workflow.
+ * A successful job execution will be recorded in the job submission entity
  */
 public abstract class ExecutionEngine {
 
@@ -31,6 +30,7 @@ public abstract class ExecutionEngine {
    * Initialize execution engine
    *
    * @param context Configuration context
+   * @parma prefix Execution engine prefix
    */
   public void initialize(ImmutableContext context, String prefix) {
   }
@@ -42,19 +42,19 @@ public abstract class ExecutionEngine {
   }
 
   /**
-   * Return new SubmissionRequest class or any subclass if it's needed by
+   * Return new JobRequest class or any subclass if it's needed by
    * execution and submission engine combination.
    *
-   * @return New Submission request object
+   * @return new JobRequestobject
    */
-  public SubmissionRequest createSubmissionRequest() {
-    return new SubmissionRequest();
+  public JobRequest createJobRequest() {
+    return new JobRequest();
   }
 
   /**
-   * Prepare given submission request.
+   * Prepare given job request.
    *
-   * @param request Submission request
+   * @param request JobRequest
    */
-  public abstract void prepareSubmission(SubmissionRequest request);
+  public abstract void prepareJob(JobRequest request);
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index b1b37f6..8149d1c 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -1,3 +1,5 @@
+
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -266,264 +268,228 @@ public class JobManager implements Reconfigurable {
   }
 
   public MSubmission submit(long jobId, HttpEventContext ctx) {
-    String username = ctx.getUsername();
-
-    Repository repository = RepositoryManager.getInstance().getRepository();
-
-    MJob job = repository.findJob(jobId);
-    if (job == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0004,
-        "Unknown job id " + jobId);
-    }
 
-    if (!job.getEnabled()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0009,
-        "Job id: " + job.getPersistenceId());
+    MSubmission mSubmission = createJobSubmission(ctx, jobId);
+    JobRequest jobRequest = createJobRequest(jobId, mSubmission);
+    // Bootstrap job to execute
+    prepareJob(jobRequest);
+    // Make sure that this job id is not currently running and submit the job
+    // only if it's not.
+    synchronized (getClass()) {
+      MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
+          .findSubmissionLastForJob(jobId);
+      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
+        throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId);
+      }
+      // TODO(Abe): Call multiple destroyers.
+      // TODO(jarcec): We might need to catch all exceptions here to ensure
+      // that Destroyer will be executed in all cases.
+      // NOTE: the following is a blocking call
+      boolean success = submissionEngine.submit(jobRequest);
+      if (!success) {
+        destroySubmission(jobRequest);
+        mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
+      }
+      RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
     }
+    return mSubmission;
+  }
 
-    MConnection fromConnection = repository.findConnection(job.getConnectionId(Direction.FROM));
-    MConnection toConnection = repository.findConnection(job.getConnectionId(Direction.TO));
+  private JobRequest createJobRequest(long jobId, MSubmission submission) {
+    // get job
+    MJob job = getJob(jobId);
 
-    if (!fromConnection.getEnabled()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0010,
-        "Connection id: " + fromConnection.getPersistenceId());
-    }
+    // get from/to connections for the job
+    MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM));
+    MConnection toConnection = getConnection(job.getConnectionId(Direction.TO));
 
-    if (!toConnection.getEnabled()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0010,
-          "Connection id: " + toConnection.getPersistenceId());
-    }
+    // get from/to connectors for the connection
+    SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
+    validateSupportedDirection(fromConnector, Direction.FROM);
+    SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
+    validateSupportedDirection(toConnector, Direction.TO);
 
-    SqoopConnector fromConnector =
-      ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM));
-    SqoopConnector toConnector =
-        ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO));
+    // Transform config to fromConnector specific classes
+    Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
+        .getConnectionConfigurationClass());
+    FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
 
-    // Make sure that connectors support the directions they will be used from.
-    if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0011,
-          "Connector: " + fromConnector.getClass().getCanonicalName());
-    }
+    // Transform config to toConnector specific classes
+    Object toConnectorConfig = ClassUtils
+        .instantiate(toConnector.getConnectionConfigurationClass());
+    FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
 
-    if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0011,
-          "Connector: " + toConnector.getClass().getCanonicalName());
-    }
+    Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
+    FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
 
-    // Transform forms to fromConnector specific classes
-    Object fromConnectorConnection = ClassUtils.instantiate(
-        fromConnector.getConnectionConfigurationClass());
-    FormUtils.fromForms(fromConnection.getConnectorPart().getForms(),
-      fromConnectorConnection);
-
-    Object fromJob = ClassUtils.instantiate(
-      fromConnector.getJobConfigurationClass(Direction.FROM));
-    FormUtils.fromForms(
-        job.getConnectorPart(Direction.FROM).getForms(), fromJob);
-
-    // Transform forms to toConnector specific classes
-    Object toConnectorConnection = ClassUtils.instantiate(
-        toConnector.getConnectionConfigurationClass());
-    FormUtils.fromForms(toConnection.getConnectorPart().getForms(),
-        toConnectorConnection);
-
-    Object toJob = ClassUtils.instantiate(
-        toConnector.getJobConfigurationClass(Direction.TO));
+    Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
     FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
 
-    // Transform framework specific forms
-    Object fromFrameworkConnection = ClassUtils.instantiate(
-      FrameworkManager.getInstance().getConnectionConfigurationClass());
-    Object toFrameworkConnection = ClassUtils.instantiate(
-        FrameworkManager.getInstance().getConnectionConfigurationClass());
-    FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(),
-      fromFrameworkConnection);
-    FormUtils.fromForms(toConnection.getFrameworkPart().getForms(),
-        toFrameworkConnection);
-
-    Object frameworkJob = ClassUtils.instantiate(
-      FrameworkManager.getInstance().getJobConfigurationClass());
+    // Transform framework specific configs
+    // Q(VB) : Aren't the following 2 exactly the same?
+    Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
+        .getConnectionConfigurationClass());
+    FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection);
+
+    Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
+        .getConnectionConfigurationClass());
+    FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection);
+
+    Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance()
+        .getJobConfigurationClass());
     FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
 
-    // Create request object
-    MSubmission summary = new MSubmission(jobId);
-    SubmissionRequest request = executionEngine.createSubmissionRequest();
-
-    summary.setCreationUser(username);
-    summary.setLastUpdateUser(username);
-
-    // Save important variables to the submission request
-    request.setSummary(summary);
-    request.setConnector(Direction.FROM, fromConnector);
-    request.setConnector(Direction.TO, toConnector);
-    request.setConnectorConnectionConfig(Direction.FROM, fromConnectorConnection);
-    request.setConnectorConnectionConfig(Direction.TO, toConnectorConnection);
-    request.setConnectorJobConfig(Direction.FROM, fromJob);
-    request.setConnectorJobConfig(Direction.TO, toJob);
-    // @TODO(Abe): Should we actually have 2 different Framework Connection config objects?
-    request.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
-    request.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
-    request.setConfigFrameworkJob(frameworkJob);
-    request.setJobName(job.getName());
-    request.setJobId(job.getPersistenceId());
-    request.setNotificationUrl(notificationBaseUrl + jobId);
+    // Create a job request for submit/execution
+    JobRequest jobRequest = executionEngine.createJobRequest();
+    // Save important variables to the job request
+    jobRequest.setSummary(submission);
+    jobRequest.setConnector(Direction.FROM, fromConnector);
+    jobRequest.setConnector(Direction.TO, toConnector);
+    jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig);
+    jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig);
+    jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
+    jobRequest.setConnectorJobConfig(Direction.TO, toJob);
+    // TODO(Abe): Should we actually have 2 different Framework Connection config objects?
+    jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
+    jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
+    jobRequest.setConfigFrameworkJob(frameworkJob);
+    jobRequest.setJobName(job.getName());
+    jobRequest.setJobId(job.getPersistenceId());
+    jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
     Class<? extends IntermediateDataFormat<?>> dataFormatClass =
       fromConnector.getIntermediateDataFormat();
-    request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
-    // Create request object
+    jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
+
+
+    jobRequest.setFrom(fromConnector.getFrom());
+    jobRequest.setTo(toConnector.getTo());
+
+    addStandardJars(jobRequest);
+    addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
+    addConnectorInitializerJars(jobRequest, Direction.FROM);
+    addConnectorInitializerJars(jobRequest, Direction.TO);
+
+    Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
+    Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
+
+    // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
+    if (fromSchema != null) {
+      jobRequest.getSummary().setFromSchema(fromSchema);
+    }
+    else {
+      jobRequest.getSummary().setFromSchema(toSchema);
+    }
+    LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
+    return jobRequest;
+  }
+
+  private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
+      SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
+    jobRequest.addJarForClass(fromConnector.getClass());
+    jobRequest.addJarForClass(toConnector.getClass());
+    jobRequest.addJarForClass(dataFormatClass);
+  }
 
+  private void addStandardJars(JobRequest jobRequest) {
     // Let's register all important jars
     // sqoop-common
-    request.addJarForClass(MapContext.class);
+    jobRequest.addJarForClass(MapContext.class);
     // sqoop-core
-    request.addJarForClass(FrameworkManager.class);
+    jobRequest.addJarForClass(FrameworkManager.class);
     // sqoop-spi
-    request.addJarForClass(SqoopConnector.class);
+    jobRequest.addJarForClass(SqoopConnector.class);
     // Execution engine jar
-    request.addJarForClass(executionEngine.getClass());
-    // Connectors in use
-    request.addJarForClass(fromConnector.getClass());
-    request.addJarForClass(toConnector.getClass());
-
+    jobRequest.addJarForClass(executionEngine.getClass());
     // Extra libraries that Sqoop code requires
-    request.addJarForClass(JSONValue.class);
-
-    // The IDF is used in the ETL process.
-    request.addJarForClass(dataFormatClass);
-
-
-    // Get callbacks
-    request.setFromCallback(fromConnector.getFrom());
-    request.setToCallback(toConnector.getTo());
-    LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback());
-
-    // Initialize submission from fromConnector perspective
-    CallbackBase[] baseCallbacks = {
-        request.getFromCallback(),
-        request.getToCallback()
-    };
+    jobRequest.addJarForClass(JSONValue.class);
+  }
 
-    CallbackBase baseCallback;
-    Class<? extends Initializer> initializerClass;
-    Initializer initializer;
-    InitializerContext initializerContext;
+  MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
+    MSubmission summary = new MSubmission(jobId);
+    summary.setCreationUser(ctx.getUsername());
+    summary.setLastUpdateUser(ctx.getUsername());
+    return summary;
+  }
 
-    // Initialize From Connector callback.
-    baseCallback = request.getFromCallback();
+  SqoopConnector getConnector(long connnectorId) {
+    return ConnectorManager.getInstance().getConnector(connnectorId);
+  }
 
-    initializerClass = baseCallback
-        .getInitializer();
-    initializer = (Initializer) ClassUtils
-        .instantiate(initializerClass);
+  void validateSupportedDirection(SqoopConnector connector, Direction direction) {
+    // Make sure that connector supports the given direction
+    if (!connector.getSupportedDirections().contains(direction)) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
+          + connector.getClass().getCanonicalName());
+    }
+  }
 
-    if (initializer == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-          "Can't create initializer instance: " + initializerClass.getName());
+  MConnection getConnection(long connectionId) {
+    MConnection connection = RepositoryManager.getInstance().getRepository()
+        .findConnection(connectionId);
+    if (!connection.getEnabled()) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
+          + connection.getPersistenceId());
     }
+    return connection;
+  }
 
-    // Initializer context
-    initializerContext = new InitializerContext(request.getConnectorContext(Direction.FROM));
+  MJob getJob(long jobId) {
+    MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
+    if (job == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId);
+    }
 
-    // Initialize submission from fromConnector perspective
-    initializer.initialize(initializerContext,
-        request.getConnectorConnectionConfig(Direction.FROM),
-        request.getConnectorJobConfig(Direction.FROM));
+    if (!job.getEnabled()) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId());
+    }
+    return job;
+  }
+  
+  private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
 
-    // Add job specific jars to
-    request.addJars(initializer.getJars(initializerContext,
-        request.getConnectorConnectionConfig(Direction.FROM),
-        request.getConnectorJobConfig(Direction.FROM)));
+    Initializer initializer = getConnectorInitializer(jobRequest, direction);
 
-    // @TODO(Abe): Alter behavior of Schema here. Need from Schema.
+    // Initializer context
+    InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
 
+    // Initialize submission from the connector perspective
+    initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction),
+        jobRequest.getConnectorJobConfig(direction));
 
-    Schema fromSchema = initializer.getSchema(initializerContext,
-            request.getConnectorConnectionConfig(Direction.FROM),
-            request.getConnectorJobConfig(Direction.FROM));
+    // TODO(Abe): Alter behavior of Schema here.
+    return initializer.getSchema(initializerContext,
+        jobRequest.getConnectorConnectionConfig(direction),
+        jobRequest.getConnectorJobConfig(direction));
+  }
 
-    // request.getSummary().setConnectorSchema(initializer.getSchema(
-    //    initializerContext,
-    //    request.getConnectorConnectionConfig(ConnectorType.FROM),
-    //    request.getConnectorJobConfig(ConnectorType.FROM)
-    // ));
+  private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
 
-    // Initialize To Connector callback.
-    baseCallback = request.getToCallback();
+    Initializer initializer = getConnectorInitializer(jobRequest, direction);
+    InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
+    // Add job specific jars to
+    jobRequest.addJars(initializer.getJars(initializerContext,
+        jobRequest.getConnectorConnectionConfig(direction),
+        jobRequest.getConnectorJobConfig(direction)));
+  }
 
-    initializerClass = baseCallback
-        .getInitializer();
-    initializer = (Initializer) ClassUtils
-        .instantiate(initializerClass);
+  private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
+    Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
+    Class<? extends Initializer> initializerClass = transferable.getInitializer();
+    Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
 
     if (initializer == null) {
       throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-          "Can't create initializer instance: " + initializerClass.getName());
-    }
-
-    // Initializer context
-    initializerContext = new InitializerContext(request.getConnectorContext(Direction.TO));
-
-    // Initialize submission from fromConnector perspective
-    initializer.initialize(initializerContext,
-        request.getConnectorConnectionConfig(Direction.TO),
-        request.getConnectorJobConfig(Direction.TO));
-
-    // Add job specific jars to
-    request.addJars(initializer.getJars(initializerContext,
-        request.getConnectorConnectionConfig(Direction.TO),
-        request.getConnectorJobConfig(Direction.TO)));
-
-    // @TODO(Abe): Alter behavior of Schema here. Need To Schema.
-
-    Schema toSchema = initializer.getSchema(initializerContext,
-            request.getConnectorConnectionConfig(Direction.TO),
-            request.getConnectorJobConfig(Direction.TO));
-
-    // Retrieve and persist the schema
-//    request.getSummary().setConnectorSchema(initializer.getSchema(
-//        initializerContext,
-//        request.getConnectorConnectionConfig(ConnectorType.TO),
-//        request.getConnectorJobConfig(ConnectorType.TO)
-//    ));
-
-    //TODO: Need better logic here
-    if (fromSchema != null)
-      request.getSummary().setConnectorSchema(fromSchema);
-    else
-      request.getSummary().setConnectorSchema(toSchema);
-
-    // Bootstrap job from framework perspective
-    prepareSubmission(request);
-
-    // Make sure that this job id is not currently running and submit the job
-    // only if it's not.
-    synchronized (getClass()) {
-      MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
-      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
-        throw new SqoopException(FrameworkError.FRAMEWORK_0002,
-          "Job with id " + jobId);
-      }
-
-      // @TODO(Abe): Call multiple destroyers.
-      // TODO(jarcec): We might need to catch all exceptions here to ensure
-      // that Destroyer will be executed in all cases.
-      boolean submitted = submissionEngine.submit(request);
-      if (!submitted) {
-        destroySubmission(request);
-        summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
-      }
-
-      repository.createSubmission(summary);
+          "Can't create connector initializer instance: " + initializerClass.getName());
     }
-
-    // Return job status most recent
-    return summary;
+    return initializer;
   }
 
-  private void prepareSubmission(SubmissionRequest request) {
-    JobConfiguration jobConfiguration = (JobConfiguration) request
-        .getConfigFrameworkJob();
+  private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) {
+    return new InitializerContext(jobRequest.getConnectorContext(direction));
+  }
 
+  void prepareJob(JobRequest request) {
+    JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob();
     // We're directly moving configured number of extractors and loaders to
     // underlying request object. In the future we might need to throttle this
     // count based on other running jobs to meet our SLAs.
@@ -531,19 +497,19 @@ public class JobManager implements Reconfigurable {
     request.setLoaders(jobConfiguration.throttling.loaders);
 
     // Delegate rest of the job to execution engine
-    executionEngine.prepareSubmission(request);
+    executionEngine.prepareJob(request);
   }
 
   /**
    * Callback that will be called only if we failed to submit the job to the
    * remote cluster.
    */
-  private void destroySubmission(SubmissionRequest request) {
-    CallbackBase fromCallback = request.getFromCallback();
-    CallbackBase toCallback = request.getToCallback();
+  void destroySubmission(JobRequest request) {
+    Transferable from = request.getFrom();
+    Transferable to = request.getTo();
 
-    Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer();
-    Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer();
+    Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
+    Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
     Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
     Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
 
@@ -557,15 +523,15 @@ public class JobManager implements Reconfigurable {
           "Can't create toDestroyer instance: " + toDestroyerClass.getName());
     }
 
-    // @TODO(Abe): Update context to manage multiple connectors. As well as summary.
+    // TODO(Abe): Update context to manage multiple connectors. As well as summary.
     DestroyerContext fromDestroyerContext = new DestroyerContext(
       request.getConnectorContext(Direction.FROM), false, request.getSummary()
-        .getConnectorSchema());
+        .getFromSchema());
     DestroyerContext toDestroyerContext = new DestroyerContext(
         request.getConnectorContext(Direction.TO), false, request.getSummary()
-        .getConnectorSchema());
+        .getToSchema());
 
-    // Initialize submission from connector perspective
+    // destroy submission from connector perspective
     fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM),
         request.getConnectorJobConfig(Direction.FROM));
     toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO),
@@ -573,42 +539,39 @@ public class JobManager implements Reconfigurable {
   }
 
   public MSubmission stop(long jobId, HttpEventContext ctx) {
-    String username = ctx.getUsername();
 
     Repository repository = RepositoryManager.getInstance().getRepository();
-    MSubmission submission = repository.findSubmissionLastForJob(jobId);
+    MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
 
-    if (submission == null || !submission.getStatus().isRunning()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0003,
-        "Job with id " + jobId + " is not running");
+    if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId
+          + " is not running");
     }
+    submissionEngine.stop(mSubmission.getExternalId());
 
-    String externalId = submission.getExternalId();
-    submissionEngine.stop(externalId);
-
-    submission.setLastUpdateUser(username);
+    mSubmission.setLastUpdateUser(ctx.getUsername());
 
     // Fetch new information to verify that the stop command has actually worked
-    update(submission);
+    update(mSubmission);
 
     // Return updated structure
-    return submission;
+    return mSubmission;
   }
 
   public MSubmission status(long jobId) {
     Repository repository = RepositoryManager.getInstance().getRepository();
-    MSubmission submission = repository.findSubmissionLastForJob(jobId);
+    MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
 
-    if (submission == null) {
+    if (mSubmission == null) {
       return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
     }
 
     // If the submission is in running state, let's update it
-    if (submission.getStatus().isRunning()) {
-      update(submission);
+    if (mSubmission.getStatus().isRunning()) {
+      update(mSubmission);
     }
 
-    return submission;
+    return mSubmission;
   }
 
   private void update(MSubmission submission) {
@@ -744,4 +707,4 @@ public class JobManager implements Reconfigurable {
       LOG.info("Ending submission manager update thread");
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
new file mode 100644
index 0000000..1f77693
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.DirectionError;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.Transferable;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Submission details class is used when creating new submission and contains
+ * all information that we need to create a new submission (including mappers,
+ * reducers, ...).
+ */
+public class JobRequest {
+
+  /**
+   * Submission summary
+   */
+  MSubmission summary;
+
+  /**
+   * Original job name
+   */
+  String jobName;
+
+  /**
+   * Associated job (from metadata perspective) id
+   */
+  long jobId;
+
+  /**
+   * Connector instances associated with this submission request
+   */
+  SqoopConnector fromConnector;
+  SqoopConnector toConnector;
+
+  /**
+   * List of required local jars for the job
+   */
+  List<String> jars;
+
+  /**
+   * From entity
+   */
+  Transferable from;
+
+  /**
+   * To entity
+   */
+  Transferable to;
+
+  /**
+   * All configuration objects
+   */
+  Object fromConnectorConnectionConfig;
+  Object toConnectorConnectionConfig;
+  Object fromConnectorJobConfig;
+  Object toConnectorJobConfig;
+  Object fromFrameworkConnectionConfig;
+  Object toFrameworkConnectionConfig;
+  Object configFrameworkJob;
+
+  /**
+   * Connector context (submission specific configuration)
+   */
+  MutableMapContext fromConnectorContext;
+  MutableMapContext toConnectorContext;
+
+  /**
+   * Framework context (submission specific configuration)
+   */
+  MutableMapContext frameworkContext;
+
+  /**
+   * Optional notification URL for job progress
+   */
+  String notificationUrl;
+
+  /**
+   * Number of extractors
+   */
+  Integer extractors;
+
+  /**
+   * Number of loaders
+   */
+  Integer loaders;
+
+  /**
+   * The intermediate data format this submission should use.
+   */
+  Class<? extends IntermediateDataFormat> intermediateDataFormat;
+
+  public JobRequest() {
+    this.jars = new LinkedList<String>();
+    this.fromConnectorContext = new MutableMapContext();
+    this.toConnectorContext = new MutableMapContext();
+    this.frameworkContext = new MutableMapContext();
+    this.fromConnector = null;
+    this.toConnector = null;
+    this.fromConnectorConnectionConfig = null;
+    this.toConnectorConnectionConfig = null;
+    this.fromConnectorJobConfig = null;
+    this.toConnectorJobConfig = null;
+    this.fromFrameworkConnectionConfig = null;
+    this.toFrameworkConnectionConfig = null;
+  }
+
+  public MSubmission getSummary() {
+    return summary;
+  }
+
+  public void setSummary(MSubmission summary) {
+    this.summary = summary;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public long getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(long jobId) {
+    this.jobId = jobId;
+  }
+
+  public SqoopConnector getConnector(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnector;
+
+      case TO:
+        return toConnector;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setConnector(Direction type, SqoopConnector connector) {
+    switch(type) {
+      case FROM:
+        fromConnector = connector;
+        break;
+
+      case TO:
+        toConnector = connector;
+        break;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public List<String> getJars() {
+    return jars;
+  }
+
+  public void addJar(String jar) {
+    if(!jars.contains(jar)) {
+      jars.add(jar);
+    }
+  }
+
+  public void addJarForClass(Class klass) {
+    addJar(ClassUtils.jarForClass(klass));
+  }
+
+  public void addJars(List<String> jars) {
+    for(String j : jars) {
+      addJar(j);
+    }
+  }
+
+  public Transferable getFrom() {
+    return from;
+  }
+
+  public void setFrom(Transferable from) {
+    this.from = from;
+  }
+
+  public Transferable getTo() {
+    return to;
+  }
+
+  public void setTo(Transferable to) {
+    this.to = to;
+  }
+
+  public Object getConnectorConnectionConfig(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnectorConnectionConfig;
+
+      case TO:
+        return toConnectorConnectionConfig;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setConnectorConnectionConfig(Direction type, Object config) {
+    switch(type) {
+      case FROM:
+        fromConnectorConnectionConfig = config;
+        break;
+      case TO:
+        toConnectorConnectionConfig = config;
+        break;
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public Object getConnectorJobConfig(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnectorJobConfig;
+
+      case TO:
+        return toConnectorJobConfig;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setConnectorJobConfig(Direction type, Object config) {
+    switch(type) {
+      case FROM:
+        fromConnectorJobConfig = config;
+        break;
+      case TO:
+        toConnectorJobConfig = config;
+        break;
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public Object getFrameworkConnectionConfig(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromFrameworkConnectionConfig;
+
+      case TO:
+        return toFrameworkConnectionConfig;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setFrameworkConnectionConfig(Direction type, Object config) {
+    switch(type) {
+      case FROM:
+        fromFrameworkConnectionConfig = config;
+        break;
+      case TO:
+        toFrameworkConnectionConfig = config;
+        break;
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public Object getConfigFrameworkJob() {
+    return configFrameworkJob;
+  }
+
+  public void setConfigFrameworkJob(Object config) {
+    configFrameworkJob = config;
+  }
+
+  public MutableMapContext getConnectorContext(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnectorContext;
+
+      case TO:
+        return toConnectorContext;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public MutableMapContext getFrameworkContext() {
+    return frameworkContext;
+  }
+
+  public String getNotificationUrl() {
+    return notificationUrl;
+  }
+
+  public void setNotificationUrl(String url) {
+    this.notificationUrl = url;
+  }
+
+  public Integer getExtractors() {
+    return extractors;
+  }
+
+  public void setExtractors(Integer extractors) {
+    this.extractors = extractors;
+  }
+
+  public Integer getLoaders() {
+    return loaders;
+  }
+
+  public void setLoaders(Integer loaders) {
+    this.loaders = loaders;
+  }
+
+  public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
+    return intermediateDataFormat;
+  }
+
+  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
+    this.intermediateDataFormat = intermediateDataFormat;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
index 3c0f6eb..732be3b 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
@@ -22,8 +22,8 @@ import org.apache.sqoop.submission.counter.Counters;
 import org.apache.sqoop.submission.SubmissionStatus;
 
 /**
- * Submission engine is capable of executing and getting information about
- * submissions to remote (hadoop) cluster.
+ * Submission engine is responsible in conveying the information about the
+ * job instances (submissions) to remote (hadoop) cluster.
  */
 public abstract class SubmissionEngine {
 
@@ -31,6 +31,7 @@ public abstract class SubmissionEngine {
    * Initialize submission engine
    *
    * @param context Configuration context
+   * @param prefix Submission engine prefix
    */
   public void initialize(MapContext context, String prefix) {
   }
@@ -57,7 +58,7 @@ public abstract class SubmissionEngine {
    *
    * @return Return true if we were able to submit job to remote cluster.
    */
-  public abstract boolean submit(SubmissionRequest submission);
+  public abstract boolean submit(JobRequest submission);
 
   /**
    * Hard stop for given submission.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
deleted file mode 100644
index bf3f785..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.DirectionError;
-import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Submission details class is used when creating new submission and contains
- * all information that we need to create a new submission (including mappers,
- * reducers, ...).
- */
-public class SubmissionRequest {
-
-  /**
-   * Submission summary
-   */
-  MSubmission summary;
-
-  /**
-   * Original job name
-   */
-  String jobName;
-
-  /**
-   * Associated job (from metadata perspective) id
-   */
-  long jobId;
-
-  /**
-   * Connector instances associated with this submission request
-   */
-  SqoopConnector fromConnector;
-  SqoopConnector toConnector;
-
-  /**
-   * List of required local jars for the job
-   */
-  List<String> jars;
-
-  /**
-   * From connector callback
-   */
-  CallbackBase fromCallback;
-
-  /**
-   * To connector callback
-   */
-  CallbackBase toCallback;
-
-  /**
-   * All configuration objects
-   */
-  Object fromConnectorConnectionConfig;
-  Object toConnectorConnectionConfig;
-  Object fromConnectorJobConfig;
-  Object toConnectorJobConfig;
-  Object fromFrameworkConnectionConfig;
-  Object toFrameworkConnectionConfig;
-  Object configFrameworkJob;
-
-  /**
-   * Connector context (submission specific configuration)
-   */
-  MutableMapContext fromConnectorContext;
-  MutableMapContext toConnectorContext;
-
-  /**
-   * Framework context (submission specific configuration)
-   */
-  MutableMapContext frameworkContext;
-
-  /**
-   * HDFS output directory
-   */
-  String outputDirectory;
-
-  /**
-   * Optional notification URL for job progress
-   */
-  String notificationUrl;
-
-  /**
-   * Number of extractors
-   */
-  Integer extractors;
-
-  /**
-   * Number of loaders
-   */
-  Integer loaders;
-
-  /**
-   * The intermediate data format this submission should use.
-   */
-  Class<? extends IntermediateDataFormat> intermediateDataFormat;
-
-  public SubmissionRequest() {
-    this.jars = new LinkedList<String>();
-    this.fromConnectorContext = new MutableMapContext();
-    this.toConnectorContext = new MutableMapContext();
-    this.frameworkContext = new MutableMapContext();
-    this.fromConnector = null;
-    this.toConnector = null;
-    this.fromConnectorConnectionConfig = null;
-    this.toConnectorConnectionConfig = null;
-    this.fromConnectorJobConfig = null;
-    this.toConnectorJobConfig = null;
-    this.fromFrameworkConnectionConfig = null;
-    this.toFrameworkConnectionConfig = null;
-  }
-
-  public MSubmission getSummary() {
-    return summary;
-  }
-
-  public void setSummary(MSubmission summary) {
-    this.summary = summary;
-  }
-
-  public String getJobName() {
-    return jobName;
-  }
-
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
-  }
-
-  public long getJobId() {
-    return jobId;
-  }
-
-  public void setJobId(long jobId) {
-    this.jobId = jobId;
-  }
-
-  public SqoopConnector getConnector(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnector;
-
-      case TO:
-        return toConnector;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setConnector(Direction type, SqoopConnector connector) {
-    switch(type) {
-      case FROM:
-        fromConnector = connector;
-        break;
-
-      case TO:
-        toConnector = connector;
-        break;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public List<String> getJars() {
-    return jars;
-  }
-
-  public void addJar(String jar) {
-    if(!jars.contains(jar)) {
-      jars.add(jar);
-    }
-  }
-
-  public void addJarForClass(Class klass) {
-    addJar(ClassUtils.jarForClass(klass));
-  }
-
-  public void addJars(List<String> jars) {
-    for(String j : jars) {
-      addJar(j);
-    }
-  }
-
-  public CallbackBase getFromCallback() {
-    return fromCallback;
-  }
-
-  public void setFromCallback(CallbackBase fromCallback) {
-    this.fromCallback = fromCallback;
-  }
-
-  public CallbackBase getToCallback() {
-    return toCallback;
-  }
-
-  public void setToCallback(CallbackBase toCallback) {
-    this.toCallback = toCallback;
-  }
-
-  public Object getConnectorConnectionConfig(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnectorConnectionConfig;
-
-      case TO:
-        return toConnectorConnectionConfig;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setConnectorConnectionConfig(Direction type, Object config) {
-    switch(type) {
-      case FROM:
-        fromConnectorConnectionConfig = config;
-        break;
-      case TO:
-        toConnectorConnectionConfig = config;
-        break;
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public Object getConnectorJobConfig(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnectorJobConfig;
-
-      case TO:
-        return toConnectorJobConfig;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setConnectorJobConfig(Direction type, Object config) {
-    switch(type) {
-      case FROM:
-        fromConnectorJobConfig = config;
-        break;
-      case TO:
-        toConnectorJobConfig = config;
-        break;
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public Object getFrameworkConnectionConfig(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromFrameworkConnectionConfig;
-
-      case TO:
-        return toFrameworkConnectionConfig;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setFrameworkConnectionConfig(Direction type, Object config) {
-    switch(type) {
-      case FROM:
-        fromFrameworkConnectionConfig = config;
-        break;
-      case TO:
-        toFrameworkConnectionConfig = config;
-        break;
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public Object getConfigFrameworkJob() {
-    return configFrameworkJob;
-  }
-
-  public void setConfigFrameworkJob(Object config) {
-    configFrameworkJob = config;
-  }
-
-  public MutableMapContext getConnectorContext(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnectorContext;
-
-      case TO:
-        return toConnectorContext;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public MutableMapContext getFrameworkContext() {
-    return frameworkContext;
-  }
-
-  public String getNotificationUrl() {
-    return notificationUrl;
-  }
-
-  public void setNotificationUrl(String url) {
-    this.notificationUrl = url;
-  }
-
-  public Integer getExtractors() {
-    return extractors;
-  }
-
-  public void setExtractors(Integer extractors) {
-    this.extractors = extractors;
-  }
-
-  public Integer getLoaders() {
-    return loaders;
-  }
-
-  public void setLoaders(Integer loaders) {
-    this.loaders = loaders;
-  }
-
-  public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
-    return intermediateDataFormat;
-  }
-
-  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
-    this.intermediateDataFormat = intermediateDataFormat;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
index 69c1b56..69dd028 100644
--- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
+++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
@@ -17,64 +17,140 @@
  */
 package org.apache.sqoop.framework;
 
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.ValidationResult;
-import org.apache.sqoop.validation.ValidationRunner;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
- *
+ * NOTE(VB): This test class will soon be removed with the Validator refactoring
  */
 public class TestFrameworkValidator {
 
-  FrameworkValidator validator;
-
-  @Before
-  public void setUp() {
-    validator = new FrameworkValidator();
-  }
-
-  @Test
-  public void testConnectionValidation() {
-    ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
-    ValidationRunner runner = new ValidationRunner();
-    ValidationResult result = runner.validate(connectionConfiguration);
-    assertEquals(Status.FINE, result.getStatus());
-    assertEquals(0, result.getMessages().size());
-  }
-
-  @Test
-  public void testJobValidation() {
-    ValidationRunner runner = new ValidationRunner();
-    ValidationResult result;
-    JobConfiguration configuration;
-
-    // Empty form is allowed
-    configuration = new JobConfiguration();
-    result = runner.validate(configuration);
-    assertEquals(Status.FINE, result.getStatus());
-
-    // Explicitly setting extractors and loaders
-    configuration = new JobConfiguration();
-    configuration.throttling.extractors = 3;
-    configuration.throttling.loaders = 3;
-    result = runner.validate(configuration);
-    assertEquals(Status.FINE, result.getStatus());
-    assertEquals(0, result.getMessages().size());
-
-    // Negative and zero values for extractors and loaders
-//    configuration = new JobConfiguration();
+//  FrameworkValidator validator;
+//
+//  @Before
+//  public void setUp() {
+//    validator = new FrameworkValidator();
+//  }
+//
+//  @Test
+//  public void testConnectionValidation() {
+//    ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
+//
+//    Validation validation = validator.validateConnection(connectionConfiguration);
+//    assertEquals(Status.FINE, validation.getStatus());
+//    assertEquals(0, validation.getMessages().size());
+//  }
+//
+//  @Test
+//  public void testExportJobValidation() {
+//    ExportJobConfiguration configuration;
+//    Validation validation;
+//
+//    // Empty form is not allowed
+//    configuration = new ExportJobConfiguration();
+//    validation = validator.validateJob(MJob.Type.EXPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("input.inputDirectory")));
+//
+//    // Explicitly setting extractors and loaders
+//    configuration = new ExportJobConfiguration();
+//    configuration.input.inputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 3;
+//    configuration.throttling.loaders = 3;
+//
+//    validation = validator.validateJob(MJob.Type.EXPORT, configuration);
+//    assertEquals(Status.FINE, validation.getStatus());
+//    assertEquals(0, validation.getMessages().size());
+//
+//    // Negative and zero values for extractors and loaders
+//    configuration = new ExportJobConfiguration();
+//    configuration.input.inputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 0;
+//    configuration.throttling.loaders = -1;
+//
+//    validation = validator.validateJob(MJob.Type.EXPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors")));
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders")));
+//  }
+//
+//
+//  @Test
+//  public void testImportJobValidation() {
+//    ImportJobConfiguration configuration;
+//    Validation validation;
+//
+//    // Empty form is not allowed
+//    configuration = new ImportJobConfiguration();
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.outputDirectory")));
+//
+//    // Explicitly setting extractors and loaders
+//    configuration = new ImportJobConfiguration();
+//    configuration.output.outputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 3;
+//    configuration.throttling.loaders = 3;
+//
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.FINE, validation.getStatus());
+//    assertEquals(0, validation.getMessages().size());
+//
+//    // Negative and zero values for extractors and loaders
+//    configuration = new ImportJobConfiguration();
+//    configuration.output.outputDirectory = "/czech/republic";
 //    configuration.throttling.extractors = 0;
 //    configuration.throttling.loaders = -1;
-//    result = runner.validate(configuration);
-//    assertEquals(Status.FINE, result.getStatus());
-//    assertTrue(result.getMessages().containsKey("throttling.extractors"));
-//    assertTrue(result.getMessages().containsKey("throttling.loaders"));
-  }
+//
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors")));
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders")));
+//
+//    // specifying both compression as well as customCompression is
+//    // unacceptable
+//    configuration = new ImportJobConfiguration();
+//    configuration.output.outputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 2;
+//    configuration.throttling.loaders = 2;
+//    configuration.output.compression = OutputCompression.BZIP2;
+//    configuration.output.customCompression = "some.compression.codec";
+//
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
+//
+//    // specifying a customCompression is fine
+//    configuration = new ImportJobConfiguration();
+//    configuration.output.outputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 2;
+//    configuration.throttling.loaders = 2;
+//    configuration.output.compression = OutputCompression.CUSTOM;
+//    configuration.output.customCompression = "some.compression.codec";
+//
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.FINE, validation.getStatus());
+//
+//    // specifying a customCompression without codec name is unacceptable
+//    configuration = new ImportJobConfiguration();
+//    configuration.output.outputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 2;
+//    configuration.throttling.loaders = 2;
+//    configuration.output.compression = OutputCompression.CUSTOM;
+//    configuration.output.customCompression = "";
+//
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
+//
+//    configuration = new ImportJobConfiguration();
+//    configuration.output.outputDirectory = "/czech/republic";
+//    configuration.throttling.extractors = 2;
+//    configuration.throttling.loaders = 2;
+//    configuration.output.compression = OutputCompression.CUSTOM;
+//    configuration.output.customCompression = null;
+//
+//    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+//    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+//    assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
+//
+//  }
 }