You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:01:06 UTC
[03/19] git commit: SQOOP-777: Sqoop2: Implement intermediate data
format representation policy
SQOOP-777: Sqoop2: Implement intermediate data format representation policy
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3c93930b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3c93930b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3c93930b
Branch: refs/heads/SQOOP-1367
Commit: 3c93930bf3d35a3910541ec3099b44c32bf7adf7
Parents: 17c7219
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sat Jul 26 11:37:50 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sat Jul 26 11:37:50 2014 -0700
----------------------------------------------------------------------
common/pom.xml | 5 +
.../org/apache/sqoop/etl/io/DataReader.java | 25 +-
.../org/apache/sqoop/etl/io/DataWriter.java | 20 +-
.../org/apache/sqoop/schema/type/Column.java | 5 +
.../connector/jdbc/GenericJdbcConnector.java | 4 +-
.../jdbc/GenericJdbcConnectorError.java | 2 +
.../jdbc/GenericJdbcExportInitializer.java | 53 ++-
.../jdbc/GenericJdbcImportInitializer.java | 7 +-
.../sqoop/connector/jdbc/TestExportLoader.java | 10 +-
.../connector/jdbc/TestImportExtractor.java | 9 +-
.../connector/jdbc/TestImportInitializer.java | 2 +-
connector/connector-sdk/pom.xml | 6 +
.../idf/CSVIntermediateDataFormat.java | 355 +++++++++++++++++++
.../connector/idf/IntermediateDataFormat.java | 143 ++++++++
.../idf/IntermediateDataFormatError.java | 57 +++
.../idf/CSVIntermediateDataFormatTest.java | 222 ++++++++++++
.../org/apache/sqoop/framework/JobManager.java | 8 +
.../sqoop/framework/SubmissionRequest.java | 15 +
execution/mapreduce/pom.xml | 4 +
.../mapreduce/MapreduceExecutionEngine.java | 61 ++--
.../java/org/apache/sqoop/job/JobConstants.java | 4 +
.../sqoop/job/etl/HdfsExportExtractor.java | 12 +-
.../sqoop/job/etl/HdfsSequenceImportLoader.java | 10 +-
.../sqoop/job/etl/HdfsTextImportLoader.java | 12 +-
.../org/apache/sqoop/job/io/SqoopWritable.java | 59 +++
.../sqoop/job/mr/SqoopFileOutputFormat.java | 7 +-
.../org/apache/sqoop/job/mr/SqoopMapper.java | 47 +--
.../sqoop/job/mr/SqoopNullOutputFormat.java | 6 +-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 85 +++--
.../org/apache/sqoop/job/mr/SqoopReducer.java | 4 +-
.../mapreduce/MapreduceExecutionEngineTest.java | 3 +
.../java/org/apache/sqoop/job/JobUtils.java | 14 +-
.../org/apache/sqoop/job/TestHdfsExtract.java | 121 +++----
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 58 ++-
.../org/apache/sqoop/job/TestMapReduce.java | 47 ++-
.../apache/sqoop/job/io/SqoopWritableTest.java | 91 +++++
.../mr/TestSqoopOutputFormatLoadExecutor.java | 54 +--
pom.xml | 11 +-
spi/pom.xml | 5 +
.../sqoop/connector/spi/SqoopConnector.java | 12 +
.../mapreduce/MapreduceSubmissionEngine.java | 1 +
41 files changed, 1408 insertions(+), 268 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index db11b5b..9bfa07d 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -45,6 +45,11 @@ limitations under the License.
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
index 3e1adc7..a34dfb4 100644
--- a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
+++ b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
@@ -18,17 +18,32 @@
package org.apache.sqoop.etl.io;
/**
- * An intermediate layer for passing data from the MR framework
+ * An intermediate layer for passing data from the execution framework
* to the ETL framework.
*/
public abstract class DataReader {
+ /**
+ * Read data from the execution framework as an object array.
+ * @return - array of objects with each column represented as an object
+ * @throws Exception
+ */
public abstract Object[] readArrayRecord() throws Exception;
- public abstract String readCsvRecord() throws Exception;
+ /**
+ * Read data from execution framework as text - as a CSV record.
+ * public abstract Object readContent(int type) throws Exception;
+ * @return - CSV formatted data.
+ * @throws Exception
+ */
+ public abstract String readTextRecord() throws Exception;
- public abstract Object readContent(int type) throws Exception;
-
- public abstract void setFieldDelimiter(char fieldDelimiter);
+ /**
+ * Read data from execution framework as a native format.
+ * @return - the content in the native format of the intermediate data
+ * format being used.
+ * @throws Exception
+ */
+ public abstract Object readContent() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
index d81364e..2166b09 100644
--- a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
+++ b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
@@ -23,12 +23,24 @@ package org.apache.sqoop.etl.io;
*/
public abstract class DataWriter {
+ /**
+ * Write an array of objects into the execution framework
+ * @param array - data to be written
+ */
public abstract void writeArrayRecord(Object[] array);
- public abstract void writeCsvRecord(String csv);
+ /**
+ * Write data into execution framework as text. The Intermediate Data Format
+ * may choose to convert the data to another format based on how the data
+ * format is implemented
+ * @param text - data represented as CSV text.
+ */
+ public abstract void writeStringRecord(String text);
- public abstract void writeContent(Object content, int type);
-
- public abstract void setFieldDelimiter(char fieldDelimiter);
+ /**
+ * Write data in the intermediate data format's native format.
+ * @param obj - data to be written
+ */
+ public abstract void writeRecord(Object obj);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Column.java b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
index 8b630b2..30c26a3 100644
--- a/common/src/main/java/org/apache/sqoop/schema/type/Column.java
+++ b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
@@ -98,4 +98,9 @@ public abstract class Column {
result = 31 * result + (nullable != null ? nullable.hashCode() : 0);
return result;
}
+
+ public boolean validate(Object o) {
+ // TODO: Implement this in all subclasses!
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index e0da80f..298288e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -21,6 +21,8 @@ import java.util.Locale;
import java.util.ResourceBundle;
import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
@@ -61,7 +63,7 @@ public class GenericJdbcConnector extends SqoopConnector {
@Override
public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle(
- GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
+ GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index 2b1a0ad..c374750 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -78,6 +78,8 @@ public enum GenericJdbcConnectorError implements ErrorCode {
GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
"stage table to destination table."),
+
+ GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.")
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index ef39cdc..80253be 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -17,6 +17,9 @@
*/
package org.apache.sqoop.connector.jdbc;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
@@ -26,9 +29,11 @@ import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
@@ -58,7 +63,53 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
@Override
public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) {
- return null;
+ configureJdbcProperties(context.getContext(), connectionConfiguration, exportJobConfiguration);
+
+ String schemaName = exportJobConfiguration.table.tableName;
+
+ if (schemaName == null) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
+ "Table name extraction not supported yet.");
+ }
+
+ if(exportJobConfiguration.table.schemaName != null) {
+ schemaName = exportJobConfiguration.table.schemaName + "." + schemaName;
+ }
+
+ Schema schema = new Schema(schemaName);
+ ResultSet rs = null;
+ ResultSetMetaData rsmt = null;
+ try {
+ rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");
+
+ rsmt = rs.getMetaData();
+ for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+ Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+ String columnName = rsmt.getColumnName(i);
+ if (columnName == null || columnName.equals("")) {
+ columnName = rsmt.getColumnLabel(i);
+ if (null == columnName) {
+ columnName = "Column " + i;
+ }
+ }
+
+ column.setName(columnName);
+ schema.addColumn(column);
+ }
+
+ return schema;
+ } catch (SQLException e) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+ } finally {
+ if(rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ LOG.info("Ignoring exception while closing ResultSet", e);
+ }
+ }
+ }
}
private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 96818ba..2ad3cb2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -71,16 +71,17 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
String schemaName = importJobConfiguration.table.tableName;
if(schemaName == null) {
schemaName = "Query";
+ } else if(importJobConfiguration.table.schemaName != null) {
+ schemaName = importJobConfiguration.table.schemaName + "." + schemaName;
}
Schema schema = new Schema(schemaName);
-
ResultSet rs = null;
ResultSetMetaData rsmt = null;
try {
rs = executor.executeQuery(
- context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
- .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
+ context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
+ .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
);
rsmt = rs.getMetaData();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index d4c4565..fc3ddd0 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -113,11 +113,6 @@ public class TestExportLoader {
int index = 0;
@Override
- public void setFieldDelimiter(char fieldDelimiter) {
- // do nothing and use default delimiter
- }
-
- @Override
public Object[] readArrayRecord() {
if (index < numberOfRows) {
Object[] array = new Object[] {
@@ -132,16 +127,17 @@ public class TestExportLoader {
}
@Override
- public String readCsvRecord() {
+ public String readTextRecord() {
fail("This method should not be invoked.");
return null;
}
@Override
- public Object readContent(int type) {
+ public Object readContent() throws Exception {
fail("This method should not be invoked.");
return null;
}
+
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index a7ed6ba..30d0b9a 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -134,11 +134,6 @@ public class TestImportExtractor extends TestCase {
int indx = START;
@Override
- public void setFieldDelimiter(char fieldDelimiter) {
- // do nothing and use default delimiter
- }
-
- @Override
public void writeArrayRecord(Object[] array) {
for (int i = 0; i < array.length; i++) {
if (array[i] instanceof Integer) {
@@ -153,12 +148,12 @@ public class TestImportExtractor extends TestCase {
}
@Override
- public void writeCsvRecord(String csv) {
+ public void writeStringRecord(String text) {
fail("This method should not be invoked.");
}
@Override
- public void writeContent(Object content, int type) {
+ public void writeRecord(Object content) {
fail("This method should not be invoked.");
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index a33fa36..cd05e30 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -327,7 +327,7 @@ public class TestImportInitializer extends TestCase {
Initializer initializer = new GenericJdbcImportInitializer();
initializer.initialize(initializerContext, connConf, jobConf);
Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
- assertEquals(getSchema(tableName), schema);
+ assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 4056e14..f54837d 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -38,6 +38,12 @@ limitations under the License.
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>sqoop-common</artifactId>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
new file mode 100644
index 0000000..39d48c7
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
+
+ public static final char SEPARATOR_CHARACTER = ',';
+ public static final char ESCAPE_CHARACTER = '\\';
+ public static final char QUOTE_CHARACTER = '\'';
+
+ private static final Logger LOG = Logger.getLogger
+ (CSVIntermediateDataFormat.class);
+
+ private static final char[] originals = {
+ 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
+ };
+
+ private static final String[] replacements = {
+ new String(new char[] { ESCAPE_CHARACTER, '\\'}),
+ new String(new char[] { ESCAPE_CHARACTER, '0'}),
+ new String(new char[] { ESCAPE_CHARACTER, 'n'}),
+ new String(new char[] { ESCAPE_CHARACTER, 'r'}),
+ new String(new char[] { ESCAPE_CHARACTER, 'Z'}),
+ new String(new char[] { ESCAPE_CHARACTER, '\"'}),
+ new String(new char[] { ESCAPE_CHARACTER, '\''})
+ };
+
+ // ISO-8859-1 is an 8-bit codec that is supported in every java implementation.
+ public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
+
+ private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
+ private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
+
+ private Schema schema;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getTextData() {
+ return data;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setTextData(String text) {
+ this.data = text;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setSchema(Schema schema) {
+ if(schema == null) {
+ return;
+ }
+ this.schema = schema;
+ List<Column> columns = schema.getColumns();
+ int i = 0;
+ for(Column col : columns) {
+ if(col.getType() == Type.TEXT) {
+ stringFieldIndices.add(i);
+ } else if(col.getType() == Type.BINARY) {
+ byteFieldIndices.add(i);
+ }
+ i++;
+ }
+ }
+
+ /**
+ * Custom CSV parser that honors quoting and escaped quotes.
+ * All other escaping is handled elsewhere.
+ *
+ * @return String[]
+ */
+ private String[] getFields() {
+ if (data == null) {
+ return null;
+ }
+
+ boolean quoted = false;
+ boolean escaped = false;
+ List<String> parsedData = new LinkedList<String>();
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < data.length(); ++i) {
+ char c = data.charAt(i);
+ switch(c) {
+ case QUOTE_CHARACTER:
+ buffer.append(c);
+ if (escaped) {
+ escaped = false;
+ } else {
+ quoted = !quoted;
+ }
+ break;
+
+ case ESCAPE_CHARACTER:
+ buffer.append(ESCAPE_CHARACTER);
+ escaped = !escaped;
+ break;
+
+ case SEPARATOR_CHARACTER:
+ if (quoted) {
+ buffer.append(c);
+ } else {
+ parsedData.add(buffer.toString());
+ buffer = new StringBuffer();
+ }
+ break;
+
+ default:
+ if (escaped) {
+ escaped = false;
+ }
+ buffer.append(c);
+ break;
+ }
+ }
+ parsedData.add(buffer.toString());
+
+ return parsedData.toArray(new String[parsedData.size()]);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object[] getObjectData() {
+ String[] fields = getFields();
+ if (fields == null) {
+ return null;
+ }
+
+ if (fields.length != schema.getColumns().size()) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+ "The data " + getTextData() + " has the wrong number of fields.");
+ }
+
+ Object[] out = new Object[fields.length];
+ Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
+ for (int i = 0; i < fields.length; i++) {
+ Type colType = cols[i].getType();
+ if (fields[i].equals("NULL")) {
+ out[i] = null;
+ continue;
+ }
+ if (colType == Type.TEXT) {
+ out[i] = unescapeStrings(fields[i]);
+ } else if (colType == Type.BINARY) {
+ out[i] = unescapeByteArray(fields[i]);
+ } else if (colType == Type.FIXED_POINT) {
+ Long byteSize = ((FixedPoint) cols[i]).getByteSize();
+ if (byteSize != null && byteSize <= Integer.SIZE) {
+ out[i] = Integer.valueOf(fields[i]);
+ } else {
+ out[i] = Long.valueOf(fields[i]);
+ }
+ } else if (colType == Type.FLOATING_POINT) {
+ Long byteSize = ((FloatingPoint) cols[i]).getByteSize();
+ if (byteSize != null && byteSize <= Float.SIZE) {
+ out[i] = Float.valueOf(fields[i]);
+ } else {
+ out[i] = Double.valueOf(fields[i]);
+ }
+ } else if (colType == Type.DECIMAL) {
+ out[i] = new BigDecimal(fields[i]);
+ } else {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
+ }
+ }
+ return out;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public void setObjectData(Object[] data) {
+ escapeArray(data);
+ this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(this.data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void read(DataInput in) throws IOException {
+ data = in.readUTF();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object other) {
+ if(this == other) {
+ return true;
+ }
+ if(other == null || !(other instanceof CSVIntermediateDataFormat)) {
+ return false;
+ }
+ return data.equals(((CSVIntermediateDataFormat)other).data);
+ }
+
+ public int compareTo(IntermediateDataFormat<?> o) {
+ if(this == o) {
+ return 0;
+ }
+ if(this.equals(o)) {
+ return 0;
+ }
+ if(!(o instanceof CSVIntermediateDataFormat)) {
+ throw new IllegalStateException("Expected Data to be instance of " +
+ "CSVIntermediateFormat, but was an instance of " + o.getClass()
+ .getName());
+ }
+ return data.compareTo(o.getTextData());
+ }
+
+ /**
+ * If the incoming data is an array, parse it and return the CSV-ised version
+ *
+ * @param array
+ */
+ private void escapeArray(Object[] array) {
+ for (int i : stringFieldIndices) {
+ array[i] = escapeStrings((String) array[i]);
+ }
+ for (int i : byteFieldIndices) {
+ array[i] = escapeByteArrays((byte[]) array[i]);
+ }
+ }
+
+ private String escapeByteArrays(byte[] bytes) {
+ try {
+ return escapeStrings(new String(bytes, BYTE_FIELD_CHARSET));
+ } catch (UnsupportedEncodingException e) {
+ // We should never hit this case.
+ // This character set should be distributed with Java.
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available.");
+ }
+ }
+
+ private String getRegExp(char orig) {
+ return getRegExp(String.valueOf(orig));
+ }
+
+ private String getRegExp(String orig) {
+ return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
+ }
+
+ private String escapeStrings(String orig) {
+ int j = 0;
+ String replacement = orig;
+ try {
+ for (j = 0; j < replacements.length; j++) {
+ replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
+ }
+ } catch (Exception e) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig + " " + replacement + " " + String.valueOf(j) + " " + e.getMessage());
+ }
+ StringBuilder builder = new StringBuilder();
+ builder.append(QUOTE_CHARACTER).append(replacement).append(QUOTE_CHARACTER);
+ return builder.toString();
+ }
+
+ private String unescapeStrings(String orig) {
+ //Remove the trailing and starting quotes.
+ orig = orig.substring(1, orig.length() - 1);
+ int j = 0;
+ try {
+ for (j = 0; j < replacements.length; j++) {
+ orig = orig.replaceAll(getRegExp(replacements[j]),
+ Matcher.quoteReplacement(String.valueOf(originals[j])));
+ }
+ } catch (Exception e) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig + " " + String.valueOf(j) + e.getMessage());
+ }
+
+ return orig;
+ }
+
+ private byte[] unescapeByteArray(String orig) {
+ // Always encoded in BYTE_FIELD_CHARSET.
+ try {
+ return unescapeStrings(orig).getBytes(BYTE_FIELD_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ // Should never hit this case.
+ // This character set should be distributed with Java.
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available.");
+ }
+ }
+
+ public String toString() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
new file mode 100644
index 0000000..91b594e
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.Type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract class representing a pluggable intermediate data format the Sqoop
+ * framework will use to move data to/from the connector. All intermediate
+ * data formats are expected to have an internal/native implementation,
+ * but also should minimally be able to return a text (CSV) version of the
+ * data. The data format should also be able to return the data as an object
+ * array - each array representing one row.
+ * <p/>
+ * Why a "native" internal format and then return text too?
+ * Imagine a connector that moves data from a system that stores data as a
+ * serialization format called FooFormat. If I also need the data to be
+ * written into HDFS as FooFormat, the additional cycles burnt in converting
+ * the FooFormat to text and back is useless - so plugging in an intermediate
+ * format that can store the data as FooFormat saves those cycles!
+ * <p/>
+ * Most fast access mechanisms, like mysqldump or pgsqldump write the data
+ * out as CSV, and most often the destination data is also represented as CSV
+ * - so having a minimal CSV support is important, so we can easily pull the
+ * data out as text.
+ * <p/>
+ * Any conversion to the final format from the native or text format is to be
+ * done by the connector or OutputFormat classes.
+ *
+ * @param <T> - Each data format may have a native representation of the
+ * data, represented by the parameter.
+ */
+public abstract class IntermediateDataFormat<T> {
+
+ protected volatile T data;
+
+ public int hashCode() {
+ return data.hashCode();
+ }
+
+ /**
+ * Set one row of data. If validate is set to true, the data is validated
+ * against the schema.
+ *
+ * @param data - A single row of data to be moved.
+ */
+ public void setData(T data) {
+ this.data = data;
+ }
+
+ /**
+ * Get one row of data.
+ *
+ * @return - One row of data, represented in the internal/native format of
+ * the intermediate data format implementation.
+ */
+ public T getData() {
+ return data;
+ }
+
+ /**
+ * Get one row of data as CSV.
+ *
+ * @return - String representing the data in CSV
+ */
+ public abstract String getTextData();
+
+ /**
+ * Set one row of data as CSV.
+ *
+ */
+ public abstract void setTextData(String text);
+
+ /**
+ * Get one row of data as an Object array.
+ *
+ * @return - String representing the data as an Object array
+ */
+ public abstract Object[] getObjectData();
+
+ /**
+ * Set one row of data as an Object array.
+ *
+ */
+ public abstract void setObjectData(Object[] data);
+
+ /**
+ * Set the schema to be used.
+ *
+ * @param schema - the schema to be used
+ */
+ public abstract void setSchema(Schema schema);
+
+ /**
+ * Get the schema of the data.
+ *
+ * @return - The schema of the data.
+ */
+ public abstract Schema getSchema();
+
+ /**
+ * Serialize the fields of this object to <code>out</code>.
+ *
+ * @param out <code>DataOuput</code> to serialize this object into.
+ * @throws IOException
+ */
+ public abstract void write(DataOutput out) throws IOException;
+
+ /**
+ * Deserialize the fields of this object from <code>in</code>.
+ *
+ * <p>For efficiency, implementations should attempt to re-use storage in the
+ * existing object where possible.</p>
+ *
+ * @param in <code>DataInput</code> to deseriablize this object from.
+ * @throws IOException
+ */
+ public abstract void read(DataInput in) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
new file mode 100644
index 0000000..9219074
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum IntermediateDataFormatError implements ErrorCode {
+ /** An unknown error has occurred. */
+ INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+ /** An encoding is missing in the Java native libraries. */
+ INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
+
+ /** Error while escaping a row. */
+ INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."),
+
+ /** Error while escaping a row. */
+ INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
+
+ /** Column type isn't known by Intermediate Data Format. */
+ INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
+
+ /** Number of fields. */
+ INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.")
+
+ ;
+
+ private final String message;
+
+ private IntermediateDataFormatError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
new file mode 100644
index 0000000..df6d30f
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class CSVIntermediateDataFormatTest {
+
+ private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
+
+ private IntermediateDataFormat<?> data;
+
+ @Before
+ public void setUp() {
+ data = new CSVIntermediateDataFormat();
+ }
+
+ private String getByteFieldString(byte[] byteFieldData) {
+ try {
+ return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
+ } catch(UnsupportedEncodingException e) {
+ // Should never get to this point because ISO-8859-1 is a standard codec.
+ return null;
+ }
+ }
+
+ @Test
+ public void testStringInStringOut() {
+ String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ + ",'" + String.valueOf(0x0A) + "'";
+ data.setTextData(testData);
+ assertEquals(testData, data.getTextData());
+ }
+
+ @Test
+ public void testNullStringInObjectOut() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+ data.setTextData(null);
+
+ Object[] out = data.getObjectData();
+
+ assertNull(out);
+ }
+
+ @Test(expected=SqoopException.class)
+ public void testEmptyStringInObjectOut() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+ data.setTextData("");
+
+ data.getObjectData();
+ }
+
+ @Test
+ public void testStringInObjectOut() {
+
+ //byte[0] = -112, byte[1] = 54 - 2's complements
+ String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ + ",'\\n'";
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+ data.setTextData(testData);
+
+ Object[] out = data.getObjectData();
+
+ assertEquals(new Long(10),out[0]);
+ assertEquals(new Long(34),out[1]);
+ assertEquals("54",out[2]);
+ assertEquals("random data",out[3]);
+ assertEquals(-112, ((byte[])out[4])[0]);
+ assertEquals(54, ((byte[])out[4])[1]);
+ assertEquals("\n", out[5].toString());
+ }
+
+ @Test
+ public void testObjectInStringOut() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+
+ byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
+ Object[] in = new Object[6];
+ in[0] = new Long(10);
+ in[1] = new Long(34);
+ in[2] = "54";
+ in[3] = "random data";
+ in[4] = byteFieldData;
+ in[5] = new String(new char[] { 0x0A });
+
+ data.setObjectData(in);
+
+ //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+ String testData = "10,34,'54','random data'," +
+ getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
+ assertEquals(testData, data.getTextData());
+ }
+
+ @Test
+ public void testObjectInObjectOut() {
+ //Test escapable sequences too.
+ //byte[0] = -112, byte[1] = 54 - 2's complements
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1"))
+ .addColumn(new FixedPoint("2"))
+ .addColumn(new Text("3"))
+ .addColumn(new Text("4"))
+ .addColumn(new Binary("5"))
+ .addColumn(new Text("6"));
+ data.setSchema(schema);
+
+ Object[] in = new Object[6];
+ in[0] = new Long(10);
+ in[1] = new Long(34);
+ in[2] = "54";
+ in[3] = "random data";
+ in[4] = new byte[] { (byte) -112, (byte) 54};
+ in[5] = new String(new char[] { 0x0A });
+ Object[] inCopy = new Object[6];
+ System.arraycopy(in,0,inCopy,0,in.length);
+
+ // Modifies the input array, so we use the copy to confirm
+ data.setObjectData(in);
+
+ assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+ }
+
+ @Test
+ public void testStringFullRangeOfCharacters() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new Text("1"));
+ data.setSchema(schema);
+
+ char[] allCharArr = new char[256];
+ for(int i = 0; i < allCharArr.length; ++i) {
+ allCharArr[i] = (char)i;
+ }
+ String strData = new String(allCharArr);
+
+ Object[] in = {strData};
+ Object[] inCopy = new Object[1];
+ System.arraycopy(in,0,inCopy,0,in.length);
+
+ // Modifies the input array, so we use the copy to confirm
+ data.setObjectData(in);
+
+ assertEquals(strData, data.getObjectData()[0]);
+ assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+ }
+
+ @Test
+ public void testByteArrayFullRangeOfCharacters() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new Binary("1"));
+ data.setSchema(schema);
+
+ byte[] allCharByteArr = new byte[256];
+ for(int i = 0; i < allCharByteArr.length; ++i) {
+ allCharByteArr[i] = (byte)i;
+ }
+
+ Object[] in = {allCharByteArr};
+ Object[] inCopy = new Object[1];
+ System.arraycopy(in,0,inCopy,0,in.length);
+
+ // Modifies the input array, so we use the copy to confirm
+ data.setObjectData(in);
+ assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index e052584..1700432 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -22,6 +22,7 @@ import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.request.HttpEventContext;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
@@ -327,6 +328,10 @@ public class JobManager implements Reconfigurable {
request.setJobName(job.getName());
request.setJobId(job.getPersistenceId());
request.setNotificationUrl(notificationBaseUrl + jobId);
+ Class<? extends IntermediateDataFormat<?>> dataFormatClass =
+ connector.getIntermediateDataFormat();
+ request.setIntermediateDataFormat(connector.getIntermediateDataFormat());
+ // Create request object
// Let's register all important jars
// sqoop-common
@@ -343,6 +348,9 @@ public class JobManager implements Reconfigurable {
// Extra libraries that Sqoop code requires
request.addJarForClass(JSONValue.class);
+ // The IDF is used in the ETL process.
+ request.addJarForClass(dataFormatClass);
+
// Get connector callbacks
switch (job.getType()) {
case IMPORT:
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index a138db5..7900eee 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.model.MJob;
@@ -107,6 +108,11 @@ public class SubmissionRequest {
*/
Integer loaders;
+ /**
+ * The intermediate data format this submission should use.
+ */
+ Class<? extends IntermediateDataFormat> intermediateDataFormat;
+
public SubmissionRequest() {
this.jars = new LinkedList<String>();
this.connectorContext = new MutableMapContext();
@@ -252,4 +258,13 @@ public class SubmissionRequest {
public void setLoaders(Integer loaders) {
this.loaders = loaders;
}
+
+ public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
+ return intermediateDataFormat;
+ }
+
+ public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
+ this.intermediateDataFormat = intermediateDataFormat;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
index f9a2a0e..b23b905 100644
--- a/execution/mapreduce/pom.xml
+++ b/execution/mapreduce/pom.xml
@@ -52,6 +52,10 @@ limitations under the License.
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<!-- See profiles for Hadoop specific dependencies -->
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 5c0a027..84f6213 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -34,7 +34,7 @@ import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Importer;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
@@ -53,14 +53,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
return new MRSubmissionRequest();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void prepareImportSubmission(SubmissionRequest gRequest) {
- MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
- ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
-
+ public void prepareSubmission(MRSubmissionRequest request) {
// Add jar dependencies
addDependencies(request);
@@ -68,13 +61,35 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
request.setInputFormatClass(SqoopInputFormat.class);
request.setMapperClass(SqoopMapper.class);
- request.setMapOutputKeyClass(Data.class);
+ request.setMapOutputKeyClass(SqoopWritable.class);
request.setMapOutputValueClass(NullWritable.class);
- request.setOutputFormatClass(SqoopFileOutputFormat.class);
- request.setOutputKeyClass(Data.class);
+ request.setOutputFormatClass(SqoopNullOutputFormat.class);
+ request.setOutputKeyClass(SqoopWritable.class);
request.setOutputValueClass(NullWritable.class);
+ // Set up framework context
+ MutableMapContext context = request.getFrameworkContext();
+ context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ request.getIntermediateDataFormat().getName());
+
+ if(request.getExtractors() != null) {
+ context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void prepareImportSubmission(SubmissionRequest gRequest) {
+ MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
+
+ prepareSubmission(request);
+ request.setOutputFormatClass(SqoopFileOutputFormat.class);
+
+ ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
+
Importer importer = (Importer)request.getConnectorCallbacks();
// Set up framework context
@@ -83,10 +98,6 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
- if(request.getExtractors() != null) {
- context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
- }
-
// TODO: This settings should be abstracted to core module at some point
if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -137,19 +148,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
- // Add jar dependencies
- addDependencies(request);
-
- // Configure map-reduce classes for import
- request.setInputFormatClass(SqoopInputFormat.class);
-
- request.setMapperClass(SqoopMapper.class);
- request.setMapOutputKeyClass(Data.class);
- request.setMapOutputValueClass(NullWritable.class);
-
- request.setOutputFormatClass(SqoopNullOutputFormat.class);
- request.setOutputKeyClass(Data.class);
- request.setOutputValueClass(NullWritable.class);
+ prepareSubmission(request);
Exporter exporter = (Exporter)request.getConnectorCallbacks();
@@ -162,10 +161,6 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
// Extractor that will be able to read all supported file types
context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
-
- if(request.getExtractors() != null) {
- context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
- }
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index 7fd9a01..b2fa15d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.job;
import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.framework.FrameworkConstants;
public final class JobConstants extends Constants {
/**
@@ -66,6 +67,9 @@ public final class JobConstants extends Constants {
public static final String HADOOP_COMPRESS_CODEC =
"mapred.output.compression.codec";
+ public static final String INTERMEDIATE_DATA_FORMAT =
+ FrameworkConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
+
private JobConstants() {
// Disable explicit object creation
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
index 1978ec6..43e6463 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
@@ -36,7 +36,6 @@ import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.io.Data;
/**
* Extract from HDFS.
@@ -50,12 +49,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
private DataWriter dataWriter;
private long rowRead = 0;
- private final char fieldDelimiter;
-
- public HdfsExportExtractor() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- }
-
@Override
public void extract(ExtractorContext context,
ConnectionConfiguration connectionConfiguration,
@@ -63,7 +56,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
conf = ((PrefixContext) context.getContext()).getConfiguration();
dataWriter = context.getDataWriter();
- dataWriter.setFieldDelimiter(fieldDelimiter);
try {
HdfsExportPartition p = partition;
@@ -113,7 +105,7 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
boolean hasNext = filereader.next(line);
while (hasNext) {
rowRead++;
- dataWriter.writeCsvRecord(line.toString());
+ dataWriter.writeStringRecord(line.toString());
line = new Text();
hasNext = filereader.next(line);
if (filereader.getPosition() >= end && filereader.syncSeen()) {
@@ -173,7 +165,7 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
next = fileseeker.getPos();
}
rowRead++;
- dataWriter.writeCsvRecord(line.toString());
+ dataWriter.writeStringRecord(line.toString());
}
LOG.info("Extracting ended on position: " + fileseeker.getPos());
filestream.close();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index a07c511..d4ffb13 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.utils.ClassUtils;
@@ -38,16 +37,9 @@ public class HdfsSequenceImportLoader extends Loader {
public static final String EXTENSION = ".seq";
- private final char fieldDelimiter;
-
- public HdfsSequenceImportLoader() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- }
-
@Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception {
DataReader reader = context.getDataReader();
- reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration();
@@ -87,7 +79,7 @@ public class HdfsSequenceImportLoader extends Loader {
String csv;
Text text = new Text();
- while ((csv = reader.readCsvRecord()) != null) {
+ while ((csv = reader.readTextRecord()) != null) {
text.set(csv);
filewriter.append(text, NullWritable.get());
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index 4621942..7b799ca 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -36,18 +37,15 @@ import org.apache.sqoop.utils.ClassUtils;
public class HdfsTextImportLoader extends Loader {
- private final char fieldDelimiter;
private final char recordDelimiter;
public HdfsTextImportLoader() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
}
@Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception{
DataReader reader = context.getDataReader();
- reader.setFieldDelimiter(fieldDelimiter);
Configuration conf = new Configuration();
// Configuration conf = ((EtlContext)context).getConfiguration();
@@ -81,15 +79,15 @@ public class HdfsTextImportLoader extends Loader {
DataOutputStream filestream = fs.create(filepath, false);
if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
- codec.createOutputStream(filestream, codec.createCompressor()),
- Data.CHARSET_NAME));
+ codec.createOutputStream(filestream, codec.createCompressor()),
+ Charsets.UTF_8));
} else {
filewriter = new BufferedWriter(new OutputStreamWriter(
- filestream, Data.CHARSET_NAME));
+ filestream, Charsets.UTF_8));
}
String csv;
- while ((csv = reader.readCsvRecord()) != null) {
+ while ((csv = reader.readTextRecord()) != null) {
filewriter.write(csv + recordDelimiter);
}
filewriter.close();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
new file mode 100644
index 0000000..ed118d2
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SqoopWritable implements WritableComparable<SqoopWritable> {
+ private String strData;
+
+ public SqoopWritable() {}
+
+ public void setString(String data) {
+ strData = data;
+ }
+
+ public String getString() {
+ return strData;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(strData);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ strData = in.readUTF();
+ }
+
+ @Override
+ public int compareTo(SqoopWritable o) {
+ return strData.compareTo(o.getString());
+ }
+
+ @Override
+ public String toString() {
+ return getString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index 356ae8a..bbf7342 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -34,13 +34,13 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
/**
* An output format for MapReduce job.
*/
public class SqoopFileOutputFormat
- extends FileOutputFormat<Data, NullWritable> {
+ extends FileOutputFormat<SqoopWritable, NullWritable> {
public static final Logger LOG =
Logger.getLogger(SqoopFileOutputFormat.class);
@@ -49,7 +49,7 @@ public class SqoopFileOutputFormat
DefaultCodec.class;
@Override
- public RecordWriter<Data, NullWritable> getRecordWriter(
+ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) throws IOException {
Configuration conf = context.getConfiguration();
@@ -69,6 +69,7 @@ public class SqoopFileOutputFormat
return executor.getRecordWriter();
}
+ @Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
Path output = getOutputPath(context);
return new DestroyerFileOutputCommitter(output, context);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 92de37e..645dbc6 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -27,21 +27,22 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;
/**
* A mapper to perform map function.
*/
-public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
+public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {
static {
ConfigurationUtils.configureLogging();
@@ -52,6 +53,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
* Service for reporting progress to mapreduce.
*/
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
+ private IntermediateDataFormat data = null;
+ private SqoopWritable dataOut = null;
@Override
public void run(Context context) throws IOException, InterruptedException {
@@ -60,6 +63,12 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
+ String intermediateDataFormatName = conf.get(JobConstants
+ .INTERMEDIATE_DATA_FORMAT);
+ data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
+ data.setSchema(ConfigurationUtils.getConnectorSchema(conf));
+ dataOut = new SqoopWritable();
+
// Objects that should be pass to the Executor execution
PrefixContext subContext = null;
Object configConnection = null;
@@ -109,46 +118,38 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
}
}
- public class MapDataWriter extends DataWriter {
+ private class MapDataWriter extends DataWriter {
private Context context;
- private Data data;
public MapDataWriter(Context context) {
this.context = context;
}
@Override
- public void setFieldDelimiter(char fieldDelimiter) {
- if (data == null) {
- data = new Data();
- }
-
- data.setFieldDelimiter(fieldDelimiter);
- }
-
- @Override
public void writeArrayRecord(Object[] array) {
- writeContent(array, Data.ARRAY_RECORD);
+ data.setObjectData(array);
+ writeContent();
}
@Override
- public void writeCsvRecord(String csv) {
- writeContent(csv, Data.CSV_RECORD);
+ public void writeStringRecord(String text) {
+ data.setTextData(text);
+ writeContent();
}
@Override
- public void writeContent(Object content, int type) {
- if (data == null) {
- data = new Data();
- }
+ public void writeRecord(Object obj) {
+ data.setData(obj.toString());
+ writeContent();
+ }
- data.setContent(content, type);
+ private void writeContent() {
try {
- context.write(data, NullWritable.get());
+ dataOut.setString(data.getTextData());
+ context.write(dataOut, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 90de6ef..b3461bb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -28,14 +28,14 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException;
/**
* An output format for MapReduce job.
*/
-public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
+public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
public static final Logger LOG =
Logger.getLogger(SqoopNullOutputFormat.class);
@@ -46,7 +46,7 @@ public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
}
@Override
- public RecordWriter<Data, NullWritable> getRecordWriter(
+ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) {
SqoopOutputFormatLoadExecutor executor =
new SqoopOutputFormatLoadExecutor(context);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 7dedee9..6efadf6 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -31,14 +31,16 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
-import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.utils.ClassUtils;
public class SqoopOutputFormatLoadExecutor {
@@ -48,7 +50,7 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false;
- private volatile Data data;
+ private volatile IntermediateDataFormat data;
private JobContext context;
private SqoopRecordWriter producer;
private Future<?> consumerFuture;
@@ -60,17 +62,19 @@ public class SqoopOutputFormatLoadExecutor {
SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
this.isTest = isTest;
this.loaderName = loaderName;
- data = new Data();
+ data = new CSVIntermediateDataFormat();
producer = new SqoopRecordWriter();
}
public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
- data = new Data();
context = jobctx;
producer = new SqoopRecordWriter();
+ data = (IntermediateDataFormat) ClassUtils.instantiate(context
+ .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
+ data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration()));
}
- public RecordWriter<Data, NullWritable> getRecordWriter() {
+ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
("OutputFormatLoader-consumer").build()).submit(
new ConsumerThread());
@@ -81,14 +85,13 @@ public class SqoopOutputFormatLoadExecutor {
* This is a producer-consumer problem and can be solved
* with two semaphores.
*/
- private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
+ private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
@Override
- public void write(Data key, NullWritable value) throws InterruptedException {
+ public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
free.acquire();
checkIfConsumerThrew();
- int type = key.getType();
- data.setContent(key.getContent(type), type);
+ data.setTextData(key.getString());
filled.release();
}
@@ -135,48 +138,68 @@ public class SqoopOutputFormatLoadExecutor {
}
private class OutputFormatDataReader extends DataReader {
- @Override
- public void setFieldDelimiter(char fieldDelimiter) {
- data.setFieldDelimiter(fieldDelimiter);
- }
@Override
public Object[] readArrayRecord() throws InterruptedException {
- return (Object[])readContent(Data.ARRAY_RECORD);
+ acquireSema();
+ // If the writer has finished, there is definitely no data remaining
+ if (writerFinished) {
+ return null;
+ }
+ try {
+ return data.getObjectData();
+ } finally {
+ releaseSema();
+ }
}
@Override
- public String readCsvRecord() throws InterruptedException {
- return (String)readContent(Data.CSV_RECORD);
+ public String readTextRecord() throws InterruptedException {
+ acquireSema();
+ // If the writer has finished, there is definitely no data remaining
+ if (writerFinished) {
+ return null;
+ }
+ try {
+ return data.getTextData();
+ } finally {
+ releaseSema();
+ }
}
@Override
- public Object readContent(int type) throws InterruptedException {
- // Has any more data been produced after I last consumed.
- // If no, wait for the producer to produce.
- try {
- filled.acquire();
- } catch (InterruptedException ex) {
- //Really at this point, there is nothing to do. Just throw and get out
- LOG.error("Interrupted while waiting for data to be available from " +
- "mapper", ex);
- throw ex;
- }
- // If the writer has finished, there is definitely no data remaining
+ public Object readContent() throws InterruptedException {
+ acquireSema();
if (writerFinished) {
return null;
}
try {
- Object content = data.getContent(type);
- return content;
+ return data.getData();
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
} finally {
- free.release();
+ releaseSema();
+ }
+ }
+
+ private void acquireSema() throws InterruptedException {
+ // Has any more data been produced after I last consumed.
+ // If no, wait for the producer to produce.
+ try {
+ filled.acquire();
+ } catch (InterruptedException ex) {
+ //Really at this point, there is nothing to do. Just throw and get out
+ LOG.error("Interrupted while waiting for data to be available from " +
+ "mapper", ex);
+ throw ex;
}
}
+
+ private void releaseSema(){
+ free.release();
+ }
}
private class ConsumerThread implements Runnable {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
index 98a2c51..a55534a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -20,7 +20,7 @@ package org.apache.sqoop.job.mr;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException;
import java.util.concurrent.Executors;
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
/**
* A reducer to perform reduce function.
*/
-public class SqoopReducer extends Reducer<Data, NullWritable, Data, NullWritable> {
+public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
static {
ConfigurationUtils.configureLogging();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
index 39d1b53..a849394 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.execution.mapreduce;
import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.framework.configuration.OutputCompression;
@@ -71,6 +72,7 @@ public class MapreduceExecutionEngineTest {
request.setConnectorCallbacks(new Importer(Initializer.class,
Partitioner.class, Extractor.class, Destroyer.class) {
});
+ request.setIntermediateDataFormat(CSVIntermediateDataFormat.class);
executionEngine.prepareImportSubmission(request);
MutableMapContext context = request.getFrameworkContext();
@@ -97,6 +99,7 @@ public class MapreduceExecutionEngineTest {
request.setConnectorCallbacks(new Importer(Initializer.class,
Partitioner.class, Extractor.class, Destroyer.class) {
});
+ request.setIntermediateDataFormat(CSVIntermediateDataFormat.class);
executionEngine.prepareImportSubmission(request);
MutableMapContext context = request.getFrameworkContext();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
index e21f15b..09e5ec5 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
@@ -44,17 +44,17 @@ public class JobUtils {
}
public static void runJob(Configuration conf,
- Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
- Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper,
- Class<? extends OutputFormat<Data, NullWritable>> output)
- throws IOException, InterruptedException, ClassNotFoundException {
+ Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+ Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
+ Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
+ throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job(conf);
job.setInputFormatClass(input);
job.setMapperClass(mapper);
- job.setMapOutputKeyClass(Data.class);
+ job.setMapOutputKeyClass(SqoopWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(output);
- job.setOutputKeyClass(Data.class);
+ job.setOutputKeyClass(SqoopWritable.class);
job.setOutputValueClass(NullWritable.class);
boolean success = job.waitForCompletion(true);