You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 04:51:52 UTC
[23/52] [abbrv] git commit: SQOOP-1496: Sqoop2: Revisit/Refactor the
SubmissionEngine/ExecutionEngine APIs
SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3d539dd4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3d539dd4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3d539dd4
Branch: refs/heads/SQOOP-1367
Commit: 3d539dd4d7477324dfe62a4e57f684351769b000
Parents: af25bcc
Author: Abraham Elmahrek <ab...@elmahrek.com>
Authored: Fri Sep 19 16:24:59 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:58:18 2014 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/json/SubmissionBean.java | 20 +-
.../org/apache/sqoop/model/MSubmission.java | 31 +-
.../apache/sqoop/json/TestSubmissionBean.java | 12 +-
.../sqoop/connector/jdbc/TestToInitializer.java | 1 -
.../idf/CSVIntermediateDataFormat.java | 4 -
.../connector/idf/IntermediateDataFormat.java | 4 -
.../idf/CSVIntermediateDataFormatTest.java | 222 ------
.../idf/TestCSVIntermediateDataFormat.java | 222 ++++++
.../sqoop/connector/ConnectorManager.java | 5 +-
.../apache/sqoop/framework/ExecutionEngine.java | 20 +-
.../org/apache/sqoop/framework/JobManager.java | 447 ++++++------
.../org/apache/sqoop/framework/JobRequest.java | 356 ++++++++++
.../sqoop/framework/SubmissionEngine.java | 7 +-
.../sqoop/framework/SubmissionRequest.java | 361 ----------
.../sqoop/framework/TestFrameworkValidator.java | 182 +++--
.../apache/sqoop/framework/TestJobManager.java | 173 +++++
.../apache/sqoop/framework/TestJobRequest.java | 71 ++
.../sqoop/framework/TestSubmissionRequest.java | 71 --
.../sqoop/repository/TestJdbcRepository.java | 694 +++++++++----------
.../sqoop/execution/mapreduce/MRJobRequest.java | 102 +++
.../mapreduce/MRSubmissionRequest.java | 102 ---
.../mapreduce/MapreduceExecutionEngine.java | 51 +-
.../apache/sqoop/job/mr/ConfigurationUtils.java | 3 -
.../apache/sqoop/job/mr/ProgressRunnable.java | 4 +-
.../sqoop/job/mr/SqoopDestroyerExecutor.java | 7 +-
.../sqoop/job/mr/SqoopFileOutputFormat.java | 4 +-
.../org/apache/sqoop/job/mr/SqoopMapper.java | 50 +-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 28 +-
.../sqoop/shell/utils/SubmissionDisplayer.java | 8 +-
.../org/apache/sqoop/job/etl/CallbackBase.java | 49 --
.../java/org/apache/sqoop/job/etl/From.java | 2 +-
.../main/java/org/apache/sqoop/job/etl/To.java | 2 +-
.../org/apache/sqoop/job/etl/Transferable.java | 51 ++
.../org/apache/sqoop/validation/Validator.java | 1 -
.../mapreduce/MapreduceSubmissionEngine.java | 12 +-
35 files changed, 1780 insertions(+), 1599 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
index 61d6576..9b1ae74 100644
--- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
@@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean {
private static final String EXCEPTION_TRACE = "exception-trace";
private static final String PROGRESS = "progress";
private static final String COUNTERS = "counters";
- private static final String CONNECTOR_SCHEMA = "schema-connector";
- private static final String HIO_SCHEMA = "schema-hio";
+ private static final String FROM_SCHEMA = "schema-from";
+ private static final String TO_SCHEMA = "schema-to";
private List<MSubmission> submissions;
@@ -116,11 +116,11 @@ public class SubmissionBean implements JsonBean {
if(submission.getCounters() != null) {
object.put(COUNTERS, extractCounters(submission.getCounters()));
}
- if(submission.getConnectorSchema() != null) {
- object.put(CONNECTOR_SCHEMA, extractSchema(submission.getConnectorSchema()));
+ if(submission.getFromSchema() != null) {
+ object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema()));
}
- if(submission.getHioSchema() != null) {
- object.put(HIO_SCHEMA, extractSchema(submission.getHioSchema()));
+ if(submission.getToSchema() != null) {
+ object.put(TO_SCHEMA, extractSchema(submission.getToSchema()));
}
array.add(object);
@@ -188,11 +188,11 @@ public class SubmissionBean implements JsonBean {
if(object.containsKey(COUNTERS)) {
submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
}
- if(object.containsKey(CONNECTOR_SCHEMA)) {
- submission.setConnectorSchema(restoreSchemna((JSONObject) object.get(CONNECTOR_SCHEMA)));
+ if(object.containsKey(FROM_SCHEMA)) {
+ submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA)));
}
- if(object.containsKey(HIO_SCHEMA)) {
- submission.setHioSchema(restoreSchemna((JSONObject) object.get(HIO_SCHEMA)));
+ if(object.containsKey(TO_SCHEMA)) {
+ submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA)));
}
this.submissions.add(submission);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/main/java/org/apache/sqoop/model/MSubmission.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
index 1edd6ee..ca21135 100644
--- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java
+++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
@@ -100,20 +100,21 @@ public class MSubmission extends MAccountableEntity {
String exceptionStackTrace;
/**
- * Schema that was reported by the connector.
+ * Schema for the FROM part of the job submission
*
* This property is required.
*/
- Schema connectorSchema;
+ Schema fromSchema;
/**
+ * Schema for the TO part of the job submission
* Optional schema that reported by the underlying I/O implementation. Please
- * note that this property might be empty and in such case the connector
- * schema will use also on Hadoop I/O side.
+ * note that this property might be empty and in such case use the FROM schema
+ * on the TO side.
*
* This property is optional.
*/
- Schema hioSchema;
+ Schema toSchema;
public MSubmission() {
status = SubmissionStatus.UNKNOWN;
@@ -219,20 +220,20 @@ public class MSubmission extends MAccountableEntity {
this.setExceptionStackTrace(writer.toString());
}
- public Schema getConnectorSchema() {
- return connectorSchema;
+ public Schema getFromSchema() {
+ return fromSchema;
}
- public void setConnectorSchema(Schema connectorSchema) {
- this.connectorSchema = connectorSchema;
+ public void setFromSchema(Schema connectorSchema) {
+ this.fromSchema = connectorSchema;
}
- public Schema getHioSchema() {
- return hioSchema;
+ public Schema getToSchema() {
+ return toSchema;
}
- public void setHioSchema(Schema hioSchema) {
- this.hioSchema = hioSchema;
+ public void setToSchema(Schema hioSchema) {
+ this.toSchema = hioSchema;
}
@Override
@@ -248,8 +249,8 @@ public class MSubmission extends MAccountableEntity {
", externalLink='" + externalLink + '\'' +
", exceptionInfo='" + exceptionInfo + '\'' +
", exceptionStackTrace='" + exceptionStackTrace + '\'' +
- ", connectorSchema='" + connectorSchema + '\'' +
- ", hioSchema='" + hioSchema + '\'' +
+ ", fromSchema='" + fromSchema + '\'' +
+ ", toSchema='" + toSchema + '\'' +
'}';
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
index d87655e..518c9cb 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java
@@ -405,20 +405,20 @@ public class TestSubmissionBean extends TestCase {
assertEquals(222222, counter.getValue());
}
- public void testTransferConnectorSchema() {
+ public void testTransferFromSchema() {
MSubmission source = new MSubmission();
- source.setConnectorSchema(getSchema());
+ source.setFromSchema(getSchema());
- Schema target = transfer(source).getConnectorSchema();
+ Schema target = transfer(source).getFromSchema();
assertNotNull(target);
assertEquals(getSchema(), target);
}
- public void testTransferHioSchema() {
+ public void testTransferToSchema() {
MSubmission source = new MSubmission();
- source.setHioSchema(getSchema());
+ source.setToSchema(getSchema());
- Schema target = transfer(source).getHioSchema();
+ Schema target = transfer(source).getToSchema();
assertNotNull(target);
assertEquals(getSchema(), target);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
index eb6fcf1..4767215 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
@@ -26,7 +26,6 @@ import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.ValidationResult;
import org.apache.sqoop.validation.ValidationRunner;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 1e8ab52..df5cb9c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -21,7 +21,6 @@ package org.apache.sqoop.connector.idf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
@@ -38,7 +37,6 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
-import java.util.regex.Pattern;
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@@ -46,8 +44,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final char ESCAPE_CHARACTER = '\\';
public static final char QUOTE_CHARACTER = '\'';
- private static final Logger LOG = Logger.getLogger
- (CSVIntermediateDataFormat.class);
private static final char[] originals = {
0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 91b594e..66d46a3 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -19,14 +19,10 @@
package org.apache.sqoop.connector.idf;
import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.Type;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
/**
* Abstract class representing a pluggable intermediate data format the Sqoop
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
deleted file mode 100644
index df6d30f..0000000
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.connector.idf;
-
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Binary;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.Text;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class CSVIntermediateDataFormatTest {
-
- private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
-
- private IntermediateDataFormat<?> data;
-
- @Before
- public void setUp() {
- data = new CSVIntermediateDataFormat();
- }
-
- private String getByteFieldString(byte[] byteFieldData) {
- try {
- return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
- } catch(UnsupportedEncodingException e) {
- // Should never get to this point because ISO-8859-1 is a standard codec.
- return null;
- }
- }
-
- @Test
- public void testStringInStringOut() {
- String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
- + ",'" + String.valueOf(0x0A) + "'";
- data.setTextData(testData);
- assertEquals(testData, data.getTextData());
- }
-
- @Test
- public void testNullStringInObjectOut() {
- Schema schema = new Schema("test");
- schema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new Text("3"))
- .addColumn(new Text("4"))
- .addColumn(new Binary("5"))
- .addColumn(new Text("6"));
- data.setSchema(schema);
- data.setTextData(null);
-
- Object[] out = data.getObjectData();
-
- assertNull(out);
- }
-
- @Test(expected=SqoopException.class)
- public void testEmptyStringInObjectOut() {
- Schema schema = new Schema("test");
- schema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new Text("3"))
- .addColumn(new Text("4"))
- .addColumn(new Binary("5"))
- .addColumn(new Text("6"));
- data.setSchema(schema);
- data.setTextData("");
-
- data.getObjectData();
- }
-
- @Test
- public void testStringInObjectOut() {
-
- //byte[0] = -112, byte[1] = 54 - 2's complements
- String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
- + ",'\\n'";
- Schema schema = new Schema("test");
- schema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new Text("3"))
- .addColumn(new Text("4"))
- .addColumn(new Binary("5"))
- .addColumn(new Text("6"));
- data.setSchema(schema);
- data.setTextData(testData);
-
- Object[] out = data.getObjectData();
-
- assertEquals(new Long(10),out[0]);
- assertEquals(new Long(34),out[1]);
- assertEquals("54",out[2]);
- assertEquals("random data",out[3]);
- assertEquals(-112, ((byte[])out[4])[0]);
- assertEquals(54, ((byte[])out[4])[1]);
- assertEquals("\n", out[5].toString());
- }
-
- @Test
- public void testObjectInStringOut() {
- Schema schema = new Schema("test");
- schema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new Text("3"))
- .addColumn(new Text("4"))
- .addColumn(new Binary("5"))
- .addColumn(new Text("6"));
- data.setSchema(schema);
-
- byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
- Object[] in = new Object[6];
- in[0] = new Long(10);
- in[1] = new Long(34);
- in[2] = "54";
- in[3] = "random data";
- in[4] = byteFieldData;
- in[5] = new String(new char[] { 0x0A });
-
- data.setObjectData(in);
-
- //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
- String testData = "10,34,'54','random data'," +
- getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
- assertEquals(testData, data.getTextData());
- }
-
- @Test
- public void testObjectInObjectOut() {
- //Test escapable sequences too.
- //byte[0] = -112, byte[1] = 54 - 2's complements
- Schema schema = new Schema("test");
- schema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new Text("3"))
- .addColumn(new Text("4"))
- .addColumn(new Binary("5"))
- .addColumn(new Text("6"));
- data.setSchema(schema);
-
- Object[] in = new Object[6];
- in[0] = new Long(10);
- in[1] = new Long(34);
- in[2] = "54";
- in[3] = "random data";
- in[4] = new byte[] { (byte) -112, (byte) 54};
- in[5] = new String(new char[] { 0x0A });
- Object[] inCopy = new Object[6];
- System.arraycopy(in,0,inCopy,0,in.length);
-
- // Modifies the input array, so we use the copy to confirm
- data.setObjectData(in);
-
- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
- }
-
- @Test
- public void testStringFullRangeOfCharacters() {
- Schema schema = new Schema("test");
- schema.addColumn(new Text("1"));
- data.setSchema(schema);
-
- char[] allCharArr = new char[256];
- for(int i = 0; i < allCharArr.length; ++i) {
- allCharArr[i] = (char)i;
- }
- String strData = new String(allCharArr);
-
- Object[] in = {strData};
- Object[] inCopy = new Object[1];
- System.arraycopy(in,0,inCopy,0,in.length);
-
- // Modifies the input array, so we use the copy to confirm
- data.setObjectData(in);
-
- assertEquals(strData, data.getObjectData()[0]);
- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
- }
-
- @Test
- public void testByteArrayFullRangeOfCharacters() {
- Schema schema = new Schema("test");
- schema.addColumn(new Binary("1"));
- data.setSchema(schema);
-
- byte[] allCharByteArr = new byte[256];
- for(int i = 0; i < allCharByteArr.length; ++i) {
- allCharByteArr[i] = (byte)i;
- }
-
- Object[] in = {allCharByteArr};
- Object[] inCopy = new Object[1];
- System.arraycopy(in,0,inCopy,0,in.length);
-
- // Modifies the input array, so we use the copy to confirm
- data.setObjectData(in);
- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
new file mode 100644
index 0000000..8c83a71
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestCSVIntermediateDataFormat {
+
+ private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
+
+ private IntermediateDataFormat<?> data;
+
+ @Before
+ public void setUp() {
+ data = new CSVIntermediateDataFormat();
+ }
+
+ private String getByteFieldString(byte[] byteFieldData) {
+ try {
+ return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
+ } catch(UnsupportedEncodingException e) {
+ // Should never get to this point because ISO-8859-1 is a standard codec.
+ return null;
+ }
+ }
+
+ @Test
+ public void testStringInStringOut() {
+ String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ + ",'" + String.valueOf(0x0A) + "'";
+ data.setTextData(testData);
+ assertEquals(testData, data.getTextData());
+ }
+
+ @Test
+ public void testNullStringInObjectOut() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+ data.setTextData(null);
+
+ Object[] out = data.getObjectData();
+
+ assertNull(out);
+ }
+
+ @Test(expected=SqoopException.class)
+ public void testEmptyStringInObjectOut() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+ data.setTextData("");
+
+ data.getObjectData();
+ }
+
+ @Test
+ public void testStringInObjectOut() {
+
+ //byte[0] = -112, byte[1] = 54 - 2's complements
+ String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ + ",'\\n'";
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+ data.setTextData(testData);
+
+ Object[] out = data.getObjectData();
+
+ assertEquals(new Long(10),out[0]);
+ assertEquals(new Long(34),out[1]);
+ assertEquals("54",out[2]);
+ assertEquals("random data",out[3]);
+ assertEquals(-112, ((byte[])out[4])[0]);
+ assertEquals(54, ((byte[])out[4])[1]);
+ assertEquals("\n", out[5].toString());
+ }
+
+ @Test
+ public void testObjectInStringOut() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+
+ byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
+ Object[] in = new Object[6];
+ in[0] = new Long(10);
+ in[1] = new Long(34);
+ in[2] = "54";
+ in[3] = "random data";
+ in[4] = byteFieldData;
+ in[5] = new String(new char[] { 0x0A });
+
+ data.setObjectData(in);
+
+ //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+ String testData = "10,34,'54','random data'," +
+ getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
+ assertEquals(testData, data.getTextData());
+ }
+
+ @Test
+ public void testObjectInObjectOut() {
+ //Test escapable sequences too.
+ //byte[0] = -112, byte[1] = 54 - 2's complements
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+
+ Object[] in = new Object[6];
+ in[0] = new Long(10);
+ in[1] = new Long(34);
+ in[2] = "54";
+ in[3] = "random data";
+ in[4] = new byte[] { (byte) -112, (byte) 54};
+ in[5] = new String(new char[] { 0x0A });
+ Object[] inCopy = new Object[6];
+ System.arraycopy(in,0,inCopy,0,in.length);
+
+ // Modifies the input array, so we use the copy to confirm
+ data.setObjectData(in);
+
+ assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+ }
+
+ @Test
+ public void testStringFullRangeOfCharacters() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new Text("1"));
+ data.setSchema(schema);
+
+ char[] allCharArr = new char[256];
+ for(int i = 0; i < allCharArr.length; ++i) {
+ allCharArr[i] = (char)i;
+ }
+ String strData = new String(allCharArr);
+
+ Object[] in = {strData};
+ Object[] inCopy = new Object[1];
+ System.arraycopy(in,0,inCopy,0,in.length);
+
+ // Modifies the input array, so we use the copy to confirm
+ data.setObjectData(in);
+
+ assertEquals(strData, data.getObjectData()[0]);
+ assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+ }
+
+ @Test
+ public void testByteArrayFullRangeOfCharacters() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new Binary("1"));
+ data.setSchema(schema);
+
+ byte[] allCharByteArr = new byte[256];
+ for(int i = 0; i < allCharByteArr.length; ++i) {
+ allCharByteArr[i] = (byte)i;
+ }
+
+ Object[] in = {allCharByteArr};
+ Object[] inCopy = new Object[1];
+ System.arraycopy(in,0,inCopy,0,in.length);
+
+ // Modifies the input array, so we use the copy to confirm
+ data.setObjectData(in);
+ assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index db6f579..c87df84 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -114,10 +114,9 @@ public class ConnectorManager implements Reconfigurable {
return bundles;
}
- public ResourceBundle getResourceBundle(long connectorId,
- Locale locale) {
+ public ResourceBundle getResourceBundle(long connectorId, Locale locale) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
- return handler.getConnector().getBundle(locale);
+ return handler.getConnector().getBundle(locale);
}
public MConnector getConnectorMetadata(long connectorId) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
index 96ec148..75b570d 100644
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -18,12 +18,11 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MSubmission;
/**
- * Execution engine drive execution of sqoop submission (job). It's responsible
+ * Execution engine drives execution of sqoop job. It's responsible
* for executing all defined steps in the import/export workflow.
+ * A successful job execution will be recorded in the job submission entity
*/
public abstract class ExecutionEngine {
@@ -31,6 +30,7 @@ public abstract class ExecutionEngine {
* Initialize execution engine
*
* @param context Configuration context
+ * @parma prefix Execution engine prefix
*/
public void initialize(ImmutableContext context, String prefix) {
}
@@ -42,19 +42,19 @@ public abstract class ExecutionEngine {
}
/**
- * Return new SubmissionRequest class or any subclass if it's needed by
+ * Return new JobRequest class or any subclass if it's needed by
* execution and submission engine combination.
*
- * @return New Submission request object
+ * @return new JobRequestobject
*/
- public SubmissionRequest createSubmissionRequest() {
- return new SubmissionRequest();
+ public JobRequest createJobRequest() {
+ return new JobRequest();
}
/**
- * Prepare given submission request.
+ * Prepare given job request.
*
- * @param request Submission request
+ * @param request JobRequest
*/
- public abstract void prepareSubmission(SubmissionRequest request);
+ public abstract void prepareJob(JobRequest request);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index b1b37f6..8149d1c 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -1,3 +1,5 @@
+
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -266,264 +268,228 @@ public class JobManager implements Reconfigurable {
}
public MSubmission submit(long jobId, HttpEventContext ctx) {
- String username = ctx.getUsername();
-
- Repository repository = RepositoryManager.getInstance().getRepository();
-
- MJob job = repository.findJob(jobId);
- if (job == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0004,
- "Unknown job id " + jobId);
- }
- if (!job.getEnabled()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0009,
- "Job id: " + job.getPersistenceId());
+ MSubmission mSubmission = createJobSubmission(ctx, jobId);
+ JobRequest jobRequest = createJobRequest(jobId, mSubmission);
+ // Bootstrap job to execute
+ prepareJob(jobRequest);
+ // Make sure that this job id is not currently running and submit the job
+ // only if it's not.
+ synchronized (getClass()) {
+ MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
+ .findSubmissionLastForJob(jobId);
+ if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId);
+ }
+ // TODO(Abe): Call multiple destroyers.
+ // TODO(jarcec): We might need to catch all exceptions here to ensure
+ // that Destroyer will be executed in all cases.
+ // NOTE: the following is a blocking call
+ boolean success = submissionEngine.submit(jobRequest);
+ if (!success) {
+ destroySubmission(jobRequest);
+ mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
+ }
+ RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
}
+ return mSubmission;
+ }
- MConnection fromConnection = repository.findConnection(job.getConnectionId(Direction.FROM));
- MConnection toConnection = repository.findConnection(job.getConnectionId(Direction.TO));
+ private JobRequest createJobRequest(long jobId, MSubmission submission) {
+ // get job
+ MJob job = getJob(jobId);
- if (!fromConnection.getEnabled()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0010,
- "Connection id: " + fromConnection.getPersistenceId());
- }
+ // get from/to connections for the job
+ MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM));
+ MConnection toConnection = getConnection(job.getConnectionId(Direction.TO));
- if (!toConnection.getEnabled()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0010,
- "Connection id: " + toConnection.getPersistenceId());
- }
+ // get from/to connectors for the connection
+ SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
+ validateSupportedDirection(fromConnector, Direction.FROM);
+ SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
+ validateSupportedDirection(toConnector, Direction.TO);
- SqoopConnector fromConnector =
- ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM));
- SqoopConnector toConnector =
- ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO));
+ // Transform config to fromConnector specific classes
+ Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
+ .getConnectionConfigurationClass());
+ FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
- // Make sure that connectors support the directions they will be used from.
- if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0011,
- "Connector: " + fromConnector.getClass().getCanonicalName());
- }
+ // Transform config to toConnector specific classes
+ Object toConnectorConfig = ClassUtils
+ .instantiate(toConnector.getConnectionConfigurationClass());
+ FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
- if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0011,
- "Connector: " + toConnector.getClass().getCanonicalName());
- }
+ Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
+ FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
- // Transform forms to fromConnector specific classes
- Object fromConnectorConnection = ClassUtils.instantiate(
- fromConnector.getConnectionConfigurationClass());
- FormUtils.fromForms(fromConnection.getConnectorPart().getForms(),
- fromConnectorConnection);
-
- Object fromJob = ClassUtils.instantiate(
- fromConnector.getJobConfigurationClass(Direction.FROM));
- FormUtils.fromForms(
- job.getConnectorPart(Direction.FROM).getForms(), fromJob);
-
- // Transform forms to toConnector specific classes
- Object toConnectorConnection = ClassUtils.instantiate(
- toConnector.getConnectionConfigurationClass());
- FormUtils.fromForms(toConnection.getConnectorPart().getForms(),
- toConnectorConnection);
-
- Object toJob = ClassUtils.instantiate(
- toConnector.getJobConfigurationClass(Direction.TO));
+ Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
- // Transform framework specific forms
- Object fromFrameworkConnection = ClassUtils.instantiate(
- FrameworkManager.getInstance().getConnectionConfigurationClass());
- Object toFrameworkConnection = ClassUtils.instantiate(
- FrameworkManager.getInstance().getConnectionConfigurationClass());
- FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(),
- fromFrameworkConnection);
- FormUtils.fromForms(toConnection.getFrameworkPart().getForms(),
- toFrameworkConnection);
-
- Object frameworkJob = ClassUtils.instantiate(
- FrameworkManager.getInstance().getJobConfigurationClass());
+ // Transform framework specific configs
+ // Q(VB) : Aren't the following 2 exactly the same?
+ Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
+ .getConnectionConfigurationClass());
+ FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection);
+
+ Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
+ .getConnectionConfigurationClass());
+ FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection);
+
+ Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance()
+ .getJobConfigurationClass());
FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
- // Create request object
- MSubmission summary = new MSubmission(jobId);
- SubmissionRequest request = executionEngine.createSubmissionRequest();
-
- summary.setCreationUser(username);
- summary.setLastUpdateUser(username);
-
- // Save important variables to the submission request
- request.setSummary(summary);
- request.setConnector(Direction.FROM, fromConnector);
- request.setConnector(Direction.TO, toConnector);
- request.setConnectorConnectionConfig(Direction.FROM, fromConnectorConnection);
- request.setConnectorConnectionConfig(Direction.TO, toConnectorConnection);
- request.setConnectorJobConfig(Direction.FROM, fromJob);
- request.setConnectorJobConfig(Direction.TO, toJob);
- // @TODO(Abe): Should we actually have 2 different Framework Connection config objects?
- request.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
- request.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
- request.setConfigFrameworkJob(frameworkJob);
- request.setJobName(job.getName());
- request.setJobId(job.getPersistenceId());
- request.setNotificationUrl(notificationBaseUrl + jobId);
+ // Create a job request for submit/execution
+ JobRequest jobRequest = executionEngine.createJobRequest();
+ // Save important variables to the job request
+ jobRequest.setSummary(submission);
+ jobRequest.setConnector(Direction.FROM, fromConnector);
+ jobRequest.setConnector(Direction.TO, toConnector);
+ jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig);
+ jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig);
+ jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
+ jobRequest.setConnectorJobConfig(Direction.TO, toJob);
+ // TODO(Abe): Should we actually have 2 different Framework Connection config objects?
+ jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
+ jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
+ jobRequest.setConfigFrameworkJob(frameworkJob);
+ jobRequest.setJobName(job.getName());
+ jobRequest.setJobId(job.getPersistenceId());
+ jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
Class<? extends IntermediateDataFormat<?>> dataFormatClass =
fromConnector.getIntermediateDataFormat();
- request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
- // Create request object
+ jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
+
+
+ jobRequest.setFrom(fromConnector.getFrom());
+ jobRequest.setTo(toConnector.getTo());
+
+ addStandardJars(jobRequest);
+ addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
+ addConnectorInitializerJars(jobRequest, Direction.FROM);
+ addConnectorInitializerJars(jobRequest, Direction.TO);
+
+ Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
+ Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
+
+ // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
+ if (fromSchema != null) {
+ jobRequest.getSummary().setFromSchema(fromSchema);
+ }
+ else {
+ jobRequest.getSummary().setFromSchema(toSchema);
+ }
+ LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
+ return jobRequest;
+ }
+
+ private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
+ SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
+ jobRequest.addJarForClass(fromConnector.getClass());
+ jobRequest.addJarForClass(toConnector.getClass());
+ jobRequest.addJarForClass(dataFormatClass);
+ }
+ private void addStandardJars(JobRequest jobRequest) {
// Let's register all important jars
// sqoop-common
- request.addJarForClass(MapContext.class);
+ jobRequest.addJarForClass(MapContext.class);
// sqoop-core
- request.addJarForClass(FrameworkManager.class);
+ jobRequest.addJarForClass(FrameworkManager.class);
// sqoop-spi
- request.addJarForClass(SqoopConnector.class);
+ jobRequest.addJarForClass(SqoopConnector.class);
// Execution engine jar
- request.addJarForClass(executionEngine.getClass());
- // Connectors in use
- request.addJarForClass(fromConnector.getClass());
- request.addJarForClass(toConnector.getClass());
-
+ jobRequest.addJarForClass(executionEngine.getClass());
// Extra libraries that Sqoop code requires
- request.addJarForClass(JSONValue.class);
-
- // The IDF is used in the ETL process.
- request.addJarForClass(dataFormatClass);
-
-
- // Get callbacks
- request.setFromCallback(fromConnector.getFrom());
- request.setToCallback(toConnector.getTo());
- LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback());
-
- // Initialize submission from fromConnector perspective
- CallbackBase[] baseCallbacks = {
- request.getFromCallback(),
- request.getToCallback()
- };
+ jobRequest.addJarForClass(JSONValue.class);
+ }
- CallbackBase baseCallback;
- Class<? extends Initializer> initializerClass;
- Initializer initializer;
- InitializerContext initializerContext;
+ MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
+ MSubmission summary = new MSubmission(jobId);
+ summary.setCreationUser(ctx.getUsername());
+ summary.setLastUpdateUser(ctx.getUsername());
+ return summary;
+ }
- // Initialize From Connector callback.
- baseCallback = request.getFromCallback();
+ SqoopConnector getConnector(long connnectorId) {
+ return ConnectorManager.getInstance().getConnector(connnectorId);
+ }
- initializerClass = baseCallback
- .getInitializer();
- initializer = (Initializer) ClassUtils
- .instantiate(initializerClass);
+ void validateSupportedDirection(SqoopConnector connector, Direction direction) {
+ // Make sure that connector supports the given direction
+ if (!connector.getSupportedDirections().contains(direction)) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
+ + connector.getClass().getCanonicalName());
+ }
+ }
- if (initializer == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create initializer instance: " + initializerClass.getName());
+ MConnection getConnection(long connectionId) {
+ MConnection connection = RepositoryManager.getInstance().getRepository()
+ .findConnection(connectionId);
+ if (!connection.getEnabled()) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
+ + connection.getPersistenceId());
}
+ return connection;
+ }
- // Initializer context
- initializerContext = new InitializerContext(request.getConnectorContext(Direction.FROM));
+ MJob getJob(long jobId) {
+ MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
+ if (job == null) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId);
+ }
- // Initialize submission from fromConnector perspective
- initializer.initialize(initializerContext,
- request.getConnectorConnectionConfig(Direction.FROM),
- request.getConnectorJobConfig(Direction.FROM));
+ if (!job.getEnabled()) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId());
+ }
+ return job;
+ }
+
+ private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
- // Add job specific jars to
- request.addJars(initializer.getJars(initializerContext,
- request.getConnectorConnectionConfig(Direction.FROM),
- request.getConnectorJobConfig(Direction.FROM)));
+ Initializer initializer = getConnectorInitializer(jobRequest, direction);
- // @TODO(Abe): Alter behavior of Schema here. Need from Schema.
+ // Initializer context
+ InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
+ // Initialize submission from the connector perspective
+ initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction),
+ jobRequest.getConnectorJobConfig(direction));
- Schema fromSchema = initializer.getSchema(initializerContext,
- request.getConnectorConnectionConfig(Direction.FROM),
- request.getConnectorJobConfig(Direction.FROM));
+ // TODO(Abe): Alter behavior of Schema here.
+ return initializer.getSchema(initializerContext,
+ jobRequest.getConnectorConnectionConfig(direction),
+ jobRequest.getConnectorJobConfig(direction));
+ }
- // request.getSummary().setConnectorSchema(initializer.getSchema(
- // initializerContext,
- // request.getConnectorConnectionConfig(ConnectorType.FROM),
- // request.getConnectorJobConfig(ConnectorType.FROM)
- // ));
+ private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
- // Initialize To Connector callback.
- baseCallback = request.getToCallback();
+ Initializer initializer = getConnectorInitializer(jobRequest, direction);
+ InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
+ // Add job specific jars to
+ jobRequest.addJars(initializer.getJars(initializerContext,
+ jobRequest.getConnectorConnectionConfig(direction),
+ jobRequest.getConnectorJobConfig(direction)));
+ }
- initializerClass = baseCallback
- .getInitializer();
- initializer = (Initializer) ClassUtils
- .instantiate(initializerClass);
+ private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
+ Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
+ Class<? extends Initializer> initializerClass = transferable.getInitializer();
+ Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
if (initializer == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create initializer instance: " + initializerClass.getName());
- }
-
- // Initializer context
- initializerContext = new InitializerContext(request.getConnectorContext(Direction.TO));
-
- // Initialize submission from fromConnector perspective
- initializer.initialize(initializerContext,
- request.getConnectorConnectionConfig(Direction.TO),
- request.getConnectorJobConfig(Direction.TO));
-
- // Add job specific jars to
- request.addJars(initializer.getJars(initializerContext,
- request.getConnectorConnectionConfig(Direction.TO),
- request.getConnectorJobConfig(Direction.TO)));
-
- // @TODO(Abe): Alter behavior of Schema here. Need To Schema.
-
- Schema toSchema = initializer.getSchema(initializerContext,
- request.getConnectorConnectionConfig(Direction.TO),
- request.getConnectorJobConfig(Direction.TO));
-
- // Retrieve and persist the schema
-// request.getSummary().setConnectorSchema(initializer.getSchema(
-// initializerContext,
-// request.getConnectorConnectionConfig(ConnectorType.TO),
-// request.getConnectorJobConfig(ConnectorType.TO)
-// ));
-
- //TODO: Need better logic here
- if (fromSchema != null)
- request.getSummary().setConnectorSchema(fromSchema);
- else
- request.getSummary().setConnectorSchema(toSchema);
-
- // Bootstrap job from framework perspective
- prepareSubmission(request);
-
- // Make sure that this job id is not currently running and submit the job
- // only if it's not.
- synchronized (getClass()) {
- MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
- if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0002,
- "Job with id " + jobId);
- }
-
- // @TODO(Abe): Call multiple destroyers.
- // TODO(jarcec): We might need to catch all exceptions here to ensure
- // that Destroyer will be executed in all cases.
- boolean submitted = submissionEngine.submit(request);
- if (!submitted) {
- destroySubmission(request);
- summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
- }
-
- repository.createSubmission(summary);
+ "Can't create connector initializer instance: " + initializerClass.getName());
}
-
- // Return job status most recent
- return summary;
+ return initializer;
}
- private void prepareSubmission(SubmissionRequest request) {
- JobConfiguration jobConfiguration = (JobConfiguration) request
- .getConfigFrameworkJob();
+ private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) {
+ return new InitializerContext(jobRequest.getConnectorContext(direction));
+ }
+ void prepareJob(JobRequest request) {
+ JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob();
// We're directly moving configured number of extractors and loaders to
// underlying request object. In the future we might need to throttle this
// count based on other running jobs to meet our SLAs.
@@ -531,19 +497,19 @@ public class JobManager implements Reconfigurable {
request.setLoaders(jobConfiguration.throttling.loaders);
// Delegate rest of the job to execution engine
- executionEngine.prepareSubmission(request);
+ executionEngine.prepareJob(request);
}
/**
* Callback that will be called only if we failed to submit the job to the
* remote cluster.
*/
- private void destroySubmission(SubmissionRequest request) {
- CallbackBase fromCallback = request.getFromCallback();
- CallbackBase toCallback = request.getToCallback();
+ void destroySubmission(JobRequest request) {
+ Transferable from = request.getFrom();
+ Transferable to = request.getTo();
- Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer();
- Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer();
+ Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
+ Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
@@ -557,15 +523,15 @@ public class JobManager implements Reconfigurable {
"Can't create toDestroyer instance: " + toDestroyerClass.getName());
}
- // @TODO(Abe): Update context to manage multiple connectors. As well as summary.
+ // TODO(Abe): Update context to manage multiple connectors. As well as summary.
DestroyerContext fromDestroyerContext = new DestroyerContext(
request.getConnectorContext(Direction.FROM), false, request.getSummary()
- .getConnectorSchema());
+ .getFromSchema());
DestroyerContext toDestroyerContext = new DestroyerContext(
request.getConnectorContext(Direction.TO), false, request.getSummary()
- .getConnectorSchema());
+ .getToSchema());
- // Initialize submission from connector perspective
+ // destroy submission from connector perspective
fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM));
toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO),
@@ -573,42 +539,39 @@ public class JobManager implements Reconfigurable {
}
public MSubmission stop(long jobId, HttpEventContext ctx) {
- String username = ctx.getUsername();
Repository repository = RepositoryManager.getInstance().getRepository();
- MSubmission submission = repository.findSubmissionLastForJob(jobId);
+ MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
- if (submission == null || !submission.getStatus().isRunning()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0003,
- "Job with id " + jobId + " is not running");
+ if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId
+ + " is not running");
}
+ submissionEngine.stop(mSubmission.getExternalId());
- String externalId = submission.getExternalId();
- submissionEngine.stop(externalId);
-
- submission.setLastUpdateUser(username);
+ mSubmission.setLastUpdateUser(ctx.getUsername());
// Fetch new information to verify that the stop command has actually worked
- update(submission);
+ update(mSubmission);
// Return updated structure
- return submission;
+ return mSubmission;
}
public MSubmission status(long jobId) {
Repository repository = RepositoryManager.getInstance().getRepository();
- MSubmission submission = repository.findSubmissionLastForJob(jobId);
+ MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
- if (submission == null) {
+ if (mSubmission == null) {
return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
}
// If the submission is in running state, let's update it
- if (submission.getStatus().isRunning()) {
- update(submission);
+ if (mSubmission.getStatus().isRunning()) {
+ update(mSubmission);
}
- return submission;
+ return mSubmission;
}
private void update(MSubmission submission) {
@@ -744,4 +707,4 @@ public class JobManager implements Reconfigurable {
LOG.info("Ending submission manager update thread");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
new file mode 100644
index 0000000..1f77693
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.DirectionError;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.Transferable;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Submission details class is used when creating new submission and contains
+ * all information that we need to create a new submission (including mappers,
+ * reducers, ...).
+ */
+public class JobRequest {
+
+ /**
+ * Submission summary
+ */
+ MSubmission summary;
+
+ /**
+ * Original job name
+ */
+ String jobName;
+
+ /**
+ * Associated job (from metadata perspective) id
+ */
+ long jobId;
+
+ /**
+ * Connector instances associated with this submission request
+ */
+ SqoopConnector fromConnector;
+ SqoopConnector toConnector;
+
+ /**
+ * List of required local jars for the job
+ */
+ List<String> jars;
+
+ /**
+ * From entity
+ */
+ Transferable from;
+
+ /**
+ * To entity
+ */
+ Transferable to;
+
+ /**
+ * All configuration objects
+ */
+ Object fromConnectorConnectionConfig;
+ Object toConnectorConnectionConfig;
+ Object fromConnectorJobConfig;
+ Object toConnectorJobConfig;
+ Object fromFrameworkConnectionConfig;
+ Object toFrameworkConnectionConfig;
+ Object configFrameworkJob;
+
+ /**
+ * Connector context (submission specific configuration)
+ */
+ MutableMapContext fromConnectorContext;
+ MutableMapContext toConnectorContext;
+
+ /**
+ * Framework context (submission specific configuration)
+ */
+ MutableMapContext frameworkContext;
+
+ /**
+ * Optional notification URL for job progress
+ */
+ String notificationUrl;
+
+ /**
+ * Number of extractors
+ */
+ Integer extractors;
+
+ /**
+ * Number of loaders
+ */
+ Integer loaders;
+
+ /**
+ * The intermediate data format this submission should use.
+ */
+ Class<? extends IntermediateDataFormat> intermediateDataFormat;
+
+ public JobRequest() {
+ this.jars = new LinkedList<String>();
+ this.fromConnectorContext = new MutableMapContext();
+ this.toConnectorContext = new MutableMapContext();
+ this.frameworkContext = new MutableMapContext();
+ this.fromConnector = null;
+ this.toConnector = null;
+ this.fromConnectorConnectionConfig = null;
+ this.toConnectorConnectionConfig = null;
+ this.fromConnectorJobConfig = null;
+ this.toConnectorJobConfig = null;
+ this.fromFrameworkConnectionConfig = null;
+ this.toFrameworkConnectionConfig = null;
+ }
+
+ public MSubmission getSummary() {
+ return summary;
+ }
+
+ public void setSummary(MSubmission summary) {
+ this.summary = summary;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(long jobId) {
+ this.jobId = jobId;
+ }
+
+ public SqoopConnector getConnector(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnector;
+
+ case TO:
+ return toConnector;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setConnector(Direction type, SqoopConnector connector) {
+ switch(type) {
+ case FROM:
+ fromConnector = connector;
+ break;
+
+ case TO:
+ toConnector = connector;
+ break;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public List<String> getJars() {
+ return jars;
+ }
+
+ public void addJar(String jar) {
+ if(!jars.contains(jar)) {
+ jars.add(jar);
+ }
+ }
+
+ public void addJarForClass(Class klass) {
+ addJar(ClassUtils.jarForClass(klass));
+ }
+
+ public void addJars(List<String> jars) {
+ for(String j : jars) {
+ addJar(j);
+ }
+ }
+
+ public Transferable getFrom() {
+ return from;
+ }
+
+ public void setFrom(Transferable from) {
+ this.from = from;
+ }
+
+ public Transferable getTo() {
+ return to;
+ }
+
+ public void setTo(Transferable to) {
+ this.to = to;
+ }
+
+ public Object getConnectorConnectionConfig(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnectorConnectionConfig;
+
+ case TO:
+ return toConnectorConnectionConfig;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setConnectorConnectionConfig(Direction type, Object config) {
+ switch(type) {
+ case FROM:
+ fromConnectorConnectionConfig = config;
+ break;
+ case TO:
+ toConnectorConnectionConfig = config;
+ break;
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public Object getConnectorJobConfig(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnectorJobConfig;
+
+ case TO:
+ return toConnectorJobConfig;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setConnectorJobConfig(Direction type, Object config) {
+ switch(type) {
+ case FROM:
+ fromConnectorJobConfig = config;
+ break;
+ case TO:
+ toConnectorJobConfig = config;
+ break;
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public Object getFrameworkConnectionConfig(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromFrameworkConnectionConfig;
+
+ case TO:
+ return toFrameworkConnectionConfig;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setFrameworkConnectionConfig(Direction type, Object config) {
+ switch(type) {
+ case FROM:
+ fromFrameworkConnectionConfig = config;
+ break;
+ case TO:
+ toFrameworkConnectionConfig = config;
+ break;
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public Object getConfigFrameworkJob() {
+ return configFrameworkJob;
+ }
+
+ public void setConfigFrameworkJob(Object config) {
+ configFrameworkJob = config;
+ }
+
+ public MutableMapContext getConnectorContext(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnectorContext;
+
+ case TO:
+ return toConnectorContext;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public MutableMapContext getFrameworkContext() {
+ return frameworkContext;
+ }
+
+ public String getNotificationUrl() {
+ return notificationUrl;
+ }
+
+ public void setNotificationUrl(String url) {
+ this.notificationUrl = url;
+ }
+
+ public Integer getExtractors() {
+ return extractors;
+ }
+
+ public void setExtractors(Integer extractors) {
+ this.extractors = extractors;
+ }
+
+ public Integer getLoaders() {
+ return loaders;
+ }
+
+ public void setLoaders(Integer loaders) {
+ this.loaders = loaders;
+ }
+
+ public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
+ return intermediateDataFormat;
+ }
+
+ public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
+ this.intermediateDataFormat = intermediateDataFormat;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
index 3c0f6eb..732be3b 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
@@ -22,8 +22,8 @@ import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.submission.SubmissionStatus;
/**
- * Submission engine is capable of executing and getting information about
- * submissions to remote (hadoop) cluster.
+ * Submission engine is responsible in conveying the information about the
+ * job instances (submissions) to remote (hadoop) cluster.
*/
public abstract class SubmissionEngine {
@@ -31,6 +31,7 @@ public abstract class SubmissionEngine {
* Initialize submission engine
*
* @param context Configuration context
+ * @param prefix Submission engine prefix
*/
public void initialize(MapContext context, String prefix) {
}
@@ -57,7 +58,7 @@ public abstract class SubmissionEngine {
*
* @return Return true if we were able to submit job to remote cluster.
*/
- public abstract boolean submit(SubmissionRequest submission);
+ public abstract boolean submit(JobRequest submission);
/**
* Hard stop for given submission.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
deleted file mode 100644
index bf3f785..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.DirectionError;
-import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Submission details class is used when creating new submission and contains
- * all information that we need to create a new submission (including mappers,
- * reducers, ...).
- */
-public class SubmissionRequest {
-
- /**
- * Submission summary
- */
- MSubmission summary;
-
- /**
- * Original job name
- */
- String jobName;
-
- /**
- * Associated job (from metadata perspective) id
- */
- long jobId;
-
- /**
- * Connector instances associated with this submission request
- */
- SqoopConnector fromConnector;
- SqoopConnector toConnector;
-
- /**
- * List of required local jars for the job
- */
- List<String> jars;
-
- /**
- * From connector callback
- */
- CallbackBase fromCallback;
-
- /**
- * To connector callback
- */
- CallbackBase toCallback;
-
- /**
- * All configuration objects
- */
- Object fromConnectorConnectionConfig;
- Object toConnectorConnectionConfig;
- Object fromConnectorJobConfig;
- Object toConnectorJobConfig;
- Object fromFrameworkConnectionConfig;
- Object toFrameworkConnectionConfig;
- Object configFrameworkJob;
-
- /**
- * Connector context (submission specific configuration)
- */
- MutableMapContext fromConnectorContext;
- MutableMapContext toConnectorContext;
-
- /**
- * Framework context (submission specific configuration)
- */
- MutableMapContext frameworkContext;
-
- /**
- * HDFS output directory
- */
- String outputDirectory;
-
- /**
- * Optional notification URL for job progress
- */
- String notificationUrl;
-
- /**
- * Number of extractors
- */
- Integer extractors;
-
- /**
- * Number of loaders
- */
- Integer loaders;
-
- /**
- * The intermediate data format this submission should use.
- */
- Class<? extends IntermediateDataFormat> intermediateDataFormat;
-
- public SubmissionRequest() {
- this.jars = new LinkedList<String>();
- this.fromConnectorContext = new MutableMapContext();
- this.toConnectorContext = new MutableMapContext();
- this.frameworkContext = new MutableMapContext();
- this.fromConnector = null;
- this.toConnector = null;
- this.fromConnectorConnectionConfig = null;
- this.toConnectorConnectionConfig = null;
- this.fromConnectorJobConfig = null;
- this.toConnectorJobConfig = null;
- this.fromFrameworkConnectionConfig = null;
- this.toFrameworkConnectionConfig = null;
- }
-
- public MSubmission getSummary() {
- return summary;
- }
-
- public void setSummary(MSubmission summary) {
- this.summary = summary;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public void setJobId(long jobId) {
- this.jobId = jobId;
- }
-
- public SqoopConnector getConnector(Direction type) {
- switch(type) {
- case FROM:
- return fromConnector;
-
- case TO:
- return toConnector;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setConnector(Direction type, SqoopConnector connector) {
- switch(type) {
- case FROM:
- fromConnector = connector;
- break;
-
- case TO:
- toConnector = connector;
- break;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public List<String> getJars() {
- return jars;
- }
-
- public void addJar(String jar) {
- if(!jars.contains(jar)) {
- jars.add(jar);
- }
- }
-
- public void addJarForClass(Class klass) {
- addJar(ClassUtils.jarForClass(klass));
- }
-
- public void addJars(List<String> jars) {
- for(String j : jars) {
- addJar(j);
- }
- }
-
- public CallbackBase getFromCallback() {
- return fromCallback;
- }
-
- public void setFromCallback(CallbackBase fromCallback) {
- this.fromCallback = fromCallback;
- }
-
- public CallbackBase getToCallback() {
- return toCallback;
- }
-
- public void setToCallback(CallbackBase toCallback) {
- this.toCallback = toCallback;
- }
-
- public Object getConnectorConnectionConfig(Direction type) {
- switch(type) {
- case FROM:
- return fromConnectorConnectionConfig;
-
- case TO:
- return toConnectorConnectionConfig;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setConnectorConnectionConfig(Direction type, Object config) {
- switch(type) {
- case FROM:
- fromConnectorConnectionConfig = config;
- break;
- case TO:
- toConnectorConnectionConfig = config;
- break;
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public Object getConnectorJobConfig(Direction type) {
- switch(type) {
- case FROM:
- return fromConnectorJobConfig;
-
- case TO:
- return toConnectorJobConfig;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setConnectorJobConfig(Direction type, Object config) {
- switch(type) {
- case FROM:
- fromConnectorJobConfig = config;
- break;
- case TO:
- toConnectorJobConfig = config;
- break;
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public Object getFrameworkConnectionConfig(Direction type) {
- switch(type) {
- case FROM:
- return fromFrameworkConnectionConfig;
-
- case TO:
- return toFrameworkConnectionConfig;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setFrameworkConnectionConfig(Direction type, Object config) {
- switch(type) {
- case FROM:
- fromFrameworkConnectionConfig = config;
- break;
- case TO:
- toFrameworkConnectionConfig = config;
- break;
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public Object getConfigFrameworkJob() {
- return configFrameworkJob;
- }
-
- public void setConfigFrameworkJob(Object config) {
- configFrameworkJob = config;
- }
-
- public MutableMapContext getConnectorContext(Direction type) {
- switch(type) {
- case FROM:
- return fromConnectorContext;
-
- case TO:
- return toConnectorContext;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public MutableMapContext getFrameworkContext() {
- return frameworkContext;
- }
-
- public String getNotificationUrl() {
- return notificationUrl;
- }
-
- public void setNotificationUrl(String url) {
- this.notificationUrl = url;
- }
-
- public Integer getExtractors() {
- return extractors;
- }
-
- public void setExtractors(Integer extractors) {
- this.extractors = extractors;
- }
-
- public Integer getLoaders() {
- return loaders;
- }
-
- public void setLoaders(Integer loaders) {
- this.loaders = loaders;
- }
-
- public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
- return intermediateDataFormat;
- }
-
- public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
- this.intermediateDataFormat = intermediateDataFormat;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
index 69c1b56..69dd028 100644
--- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
+++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
@@ -17,64 +17,140 @@
*/
package org.apache.sqoop.framework;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.ValidationResult;
-import org.apache.sqoop.validation.ValidationRunner;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
- *
+ * NOTE(VB): This test class will soon be removed with the Validator refactoring
*/
public class TestFrameworkValidator {
- FrameworkValidator validator;
-
- @Before
- public void setUp() {
- validator = new FrameworkValidator();
- }
-
- @Test
- public void testConnectionValidation() {
- ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
- ValidationRunner runner = new ValidationRunner();
- ValidationResult result = runner.validate(connectionConfiguration);
- assertEquals(Status.FINE, result.getStatus());
- assertEquals(0, result.getMessages().size());
- }
-
- @Test
- public void testJobValidation() {
- ValidationRunner runner = new ValidationRunner();
- ValidationResult result;
- JobConfiguration configuration;
-
- // Empty form is allowed
- configuration = new JobConfiguration();
- result = runner.validate(configuration);
- assertEquals(Status.FINE, result.getStatus());
-
- // Explicitly setting extractors and loaders
- configuration = new JobConfiguration();
- configuration.throttling.extractors = 3;
- configuration.throttling.loaders = 3;
- result = runner.validate(configuration);
- assertEquals(Status.FINE, result.getStatus());
- assertEquals(0, result.getMessages().size());
-
- // Negative and zero values for extractors and loaders
-// configuration = new JobConfiguration();
+// FrameworkValidator validator;
+//
+// @Before
+// public void setUp() {
+// validator = new FrameworkValidator();
+// }
+//
+// @Test
+// public void testConnectionValidation() {
+// ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
+//
+// Validation validation = validator.validateConnection(connectionConfiguration);
+// assertEquals(Status.FINE, validation.getStatus());
+// assertEquals(0, validation.getMessages().size());
+// }
+//
+// @Test
+// public void testExportJobValidation() {
+// ExportJobConfiguration configuration;
+// Validation validation;
+//
+// // Empty form is not allowed
+// configuration = new ExportJobConfiguration();
+// validation = validator.validateJob(MJob.Type.EXPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("input.inputDirectory")));
+//
+// // Explicitly setting extractors and loaders
+// configuration = new ExportJobConfiguration();
+// configuration.input.inputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 3;
+// configuration.throttling.loaders = 3;
+//
+// validation = validator.validateJob(MJob.Type.EXPORT, configuration);
+// assertEquals(Status.FINE, validation.getStatus());
+// assertEquals(0, validation.getMessages().size());
+//
+// // Negative and zero values for extractors and loaders
+// configuration = new ExportJobConfiguration();
+// configuration.input.inputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 0;
+// configuration.throttling.loaders = -1;
+//
+// validation = validator.validateJob(MJob.Type.EXPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors")));
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders")));
+// }
+//
+//
+// @Test
+// public void testImportJobValidation() {
+// ImportJobConfiguration configuration;
+// Validation validation;
+//
+// // Empty form is not allowed
+// configuration = new ImportJobConfiguration();
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.outputDirectory")));
+//
+// // Explicitly setting extractors and loaders
+// configuration = new ImportJobConfiguration();
+// configuration.output.outputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 3;
+// configuration.throttling.loaders = 3;
+//
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.FINE, validation.getStatus());
+// assertEquals(0, validation.getMessages().size());
+//
+// // Negative and zero values for extractors and loaders
+// configuration = new ImportJobConfiguration();
+// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 0;
// configuration.throttling.loaders = -1;
-// result = runner.validate(configuration);
-// assertEquals(Status.FINE, result.getStatus());
-// assertTrue(result.getMessages().containsKey("throttling.extractors"));
-// assertTrue(result.getMessages().containsKey("throttling.loaders"));
- }
+//
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors")));
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders")));
+//
+// // specifying both compression as well as customCompression is
+// // unacceptable
+// configuration = new ImportJobConfiguration();
+// configuration.output.outputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 2;
+// configuration.throttling.loaders = 2;
+// configuration.output.compression = OutputCompression.BZIP2;
+// configuration.output.customCompression = "some.compression.codec";
+//
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
+//
+// // specifying a customCompression is fine
+// configuration = new ImportJobConfiguration();
+// configuration.output.outputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 2;
+// configuration.throttling.loaders = 2;
+// configuration.output.compression = OutputCompression.CUSTOM;
+// configuration.output.customCompression = "some.compression.codec";
+//
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.FINE, validation.getStatus());
+//
+// // specifying a customCompression without codec name is unacceptable
+// configuration = new ImportJobConfiguration();
+// configuration.output.outputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 2;
+// configuration.throttling.loaders = 2;
+// configuration.output.compression = OutputCompression.CUSTOM;
+// configuration.output.customCompression = "";
+//
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
+//
+// configuration = new ImportJobConfiguration();
+// configuration.output.outputDirectory = "/czech/republic";
+// configuration.throttling.extractors = 2;
+// configuration.throttling.loaders = 2;
+// configuration.output.compression = OutputCompression.CUSTOM;
+// configuration.output.customCompression = null;
+//
+// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
+//
+// }
}