You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/03 01:56:10 UTC
git commit: SQOOP-1560: Sqoop2: Move matcher out of Schema
Repository: sqoop
Updated Branches:
refs/heads/SQOOP-1367 48ce535ba -> 095791bfc
SQOOP-1560: Sqoop2: Move matcher out of Schema
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/095791bf
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/095791bf
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/095791bf
Branch: refs/heads/SQOOP-1367
Commit: 095791bfc0b74db3bed169735825c24cd54ebc00
Parents: 48ce535
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Oct 2 16:55:29 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Oct 2 16:55:29 2014 -0700
----------------------------------------------------------------------
.../idf/CSVIntermediateDataFormat.java | 134 ++++-----
.../connector/idf/IntermediateDataFormat.java | 9 +-
.../connector/idf/matcher/AbstractMatcher.java | 62 -----
.../connector/idf/matcher/LocationMatcher.java | 82 ------
.../connector/idf/matcher/NameMatcher.java | 69 -----
.../connector/matcher/LocationMatcher.java | 78 ++++++
.../apache/sqoop/connector/matcher/Matcher.java | 69 +++++
.../sqoop/connector/matcher/MatcherError.java | 41 +++
.../sqoop/connector/matcher/MatcherFactory.java | 30 ++
.../sqoop/connector/matcher/NameMatcher.java | 74 +++++
.../idf/TestCSVIntermediateDataFormat.java | 160 ++---------
.../org/apache/sqoop/job/mr/SqoopMapper.java | 43 +--
.../job/mr/SqoopOutputFormatLoadExecutor.java | 16 +-
.../java/org/apache/sqoop/job/JobUtils.java | 12 +-
.../org/apache/sqoop/job/TestMapReduce.java | 12 +-
.../java/org/apache/sqoop/job/TestMatching.java | 275 +++++++++++++++++++
16 files changed, 681 insertions(+), 485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 2a49221..02d1a51 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -22,12 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.matcher.AbstractMatcher;
-import org.apache.sqoop.connector.idf.matcher.LocationMatcher;
-import org.apache.sqoop.connector.idf.matcher.NameMatcher;
import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
-import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
@@ -41,7 +36,6 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
import java.util.regex.Matcher;
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@@ -71,8 +65,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
- private Schema fromSchema;
- private Schema toSchema;
+ private Schema schema;
/**
* {@inheritDoc}
@@ -94,11 +87,11 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
* {@inheritDoc}
*/
@Override
- public void setFromSchema(Schema schema) {
+ public void setSchema(Schema schema) {
if(schema == null) {
return;
}
- this.fromSchema = schema;
+ this.schema = schema;
List<Column> columns = schema.getColumns();
int i = 0;
for(Column col : columns) {
@@ -112,19 +105,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
}
/**
- * {@inheritDoc}
- */
- @Override
- public void setToSchema(Schema schema) {
- if(schema == null) {
- return;
- }
- this.toSchema = schema;
- }
-
-
-
- /**
* Custom CSV parser that honors quoting and escaped quotes.
* All other escaping is handled elsewhere.
*
@@ -180,69 +160,68 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
/**
* {@inheritDoc}
- *
- * The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema.
- * We currently have 3 methods of matching fields in one schema to another:
- * - by location
- * - by name
- * - user-defined matching
- *
- * If one schema exists (either to or from) and the other is empty
- * We'll match fields based on location.
- * If both schemas exist, we'll match names of fields.
- *
- * In the future, we may want to let users choose the method
- * Currently nothing is implemented for user-defined matching
*/
@Override
public Object[] getObjectData() {
+ if (schema.isEmpty()) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+ }
+
String[] fields = getFields();
+
if (fields == null) {
return null;
}
- if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) {
- throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+ if (fields.length != schema.getColumns().size()) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+ "The data " + getTextData() + " has the wrong number of fields.");
}
- AbstractMatcher matcher = getMatcher(fromSchema,toSchema);
- String[] outFields = matcher.getMatchingData(fields, fromSchema, toSchema);
- Object[] out = new Object[outFields.length];
-
- int i = 0;
-
- // After getting back the data in order that matches the output schema
- // We need to un-do the CSV escaping
- for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) {
- Type colType = col.getType();
- if (outFields[i] == null) {
+ Object[] out = new Object[fields.length];
+ Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
+ for (int i = 0; i < fields.length; i++) {
+ Type colType = cols[i].getType();
+ if (fields[i].equals("NULL")) {
out[i] = null;
continue;
}
- if (colType == Type.TEXT) {
- out[i] = unescapeStrings(outFields[i]);
- } else if (colType == Type.BINARY) {
- out[i] = unescapeByteArray(outFields[i]);
- } else if (colType == Type.FIXED_POINT) {
- Long byteSize = ((FixedPoint) col).getByteSize();
- if (byteSize != null && byteSize <= Integer.SIZE) {
- out[i] = Integer.valueOf(outFields[i]);
- } else {
- out[i] = Long.valueOf(outFields[i]);
- }
- } else if (colType == Type.FLOATING_POINT) {
- Long byteSize = ((FloatingPoint) col).getByteSize();
- if (byteSize != null && byteSize <= Float.SIZE) {
- out[i] = Float.valueOf(outFields[i]);
- } else {
- out[i] = Double.valueOf(outFields[i]);
- }
- } else if (colType == Type.DECIMAL) {
- out[i] = new BigDecimal(outFields[i]);
- } else {
- throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
+
+ Long byteSize;
+ switch(colType) {
+ case TEXT:
+ out[i] = unescapeStrings(fields[i]);
+ break;
+ case BINARY:
+ out[i] = unescapeByteArray(fields[i]);
+ break;
+ case FIXED_POINT:
+ byteSize = ((FixedPoint) cols[i]).getByteSize();
+ if (byteSize != null && byteSize <= Integer.SIZE) {
+ out[i] = Integer.valueOf(fields[i]);
+ } else {
+ out[i] = Long.valueOf(fields[i]);
+ }
+ break;
+ case FLOATING_POINT:
+ byteSize = ((FloatingPoint) cols[i]).getByteSize();
+ if (byteSize != null && byteSize <= Float.SIZE) {
+ out[i] = Float.valueOf(fields[i]);
+ } else {
+ out[i] = Double.valueOf(fields[i]);
+ }
+ break;
+ case DECIMAL:
+ out[i] = new BigDecimal(fields[i]);
+ break;
+ case DATE:
+ case DATE_TIME:
+ case BIT:
+ out[i] = fields[i];
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
}
- i++;
}
return out;
}
@@ -380,15 +359,4 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public String toString() {
return data;
}
-
- private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) {
- if (toSchema.isEmpty() || fromSchema.isEmpty()) {
- return new LocationMatcher();
- } else {
- return new NameMatcher();
- }
-
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index d98b779..5ef6fc6 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -111,14 +111,7 @@ public abstract class IntermediateDataFormat<T> {
*
* @param schema - the schema used for reading data
*/
- public abstract void setFromSchema(Schema schema);
-
- /**
- * Set the schema for writing data.
- *
- * @param schema - the schema used for writing data
- */
- public abstract void setToSchema(Schema schema);
+ public abstract void setSchema(Schema schema);
/**
* Serialize the fields of this object to <code>out</code>.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
deleted file mode 100644
index e6b2316..0000000
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.idf.matcher;
-
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-
-public abstract class AbstractMatcher {
-
- //NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats
- //NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS.
-
- /**
- *
- * @param fields
- * @param fromSchema
- * @param toSchema
- * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
- * Right not "converted" means re-ordering if needed and handling nulls.
- */
- abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema);
-
- /***
- *
- * @param fromSchema
- * @param toSchema
- * @return return a schema with which to read the output data
- * This always returns the toSchema (since this is used when getting output data), unless its empty
- */
- public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) {
- if (toSchema.isEmpty()) {
- return fromSchema;
- } else {
- return toSchema;
- }
-
- }
-
- protected boolean isNull(String value) {
- if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) {
- return true;
- }
- return false;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
deleted file mode 100644
index 938a5df..0000000
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.idf.matcher;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
-import org.apache.sqoop.schema.SchemaMatchOption;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.FloatingPoint;
-import org.apache.sqoop.schema.type.Type;
-
-import java.math.BigDecimal;
-import java.util.Iterator;
-
-
-/**
- * Convert data according to FROM schema to data according to TO schema
- * This is done based on column location
- * So data in first column in FROM goes into first column in TO, etc
- * If TO schema has more fields and they are "nullable", the value will be set to null
- * If TO schema has extra non-null fields, we'll throw an exception
- */
-public class LocationMatcher extends AbstractMatcher {
-
- public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
- @Override
- public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
-
- String[] out = new String[toSchema.getColumns().size()];
-
- int i = 0;
-
- if (toSchema.isEmpty()) {
- // If there's no destination schema, no need to convert anything
- // Just use the original data
- return fields;
- }
-
- for (Column col: toSchema.getColumns())
- {
- if (i < fields.length) {
- if (isNull(fields[i])) {
- out[i] = null;
- } else {
- out[i] = fields[i];
- }
- }
- // We ran out of fields before we ran out of schema
- else {
- if (!col.getNullable()) {
- throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
- } else {
- LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
- out[i] = null;
- }
- }
- i++;
- }
- return out;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
deleted file mode 100644
index 417c85b..0000000
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.idf.matcher;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
-import org.apache.sqoop.schema.type.Column;
-
-import java.util.HashMap;
-
-public class NameMatcher extends AbstractMatcher {
- public static final Logger LOG = Logger.getLogger(NameMatcher.class);
-
- @Override
- public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
- String[] out = new String[toSchema.getColumns().size()];
-
- HashMap<String,Column> colNames = new HashMap<String, Column>();
-
- for (Column fromCol: fromSchema.getColumns()) {
- colNames.put(fromCol.getName(), fromCol);
- }
-
- int toIndex = 0;
-
- for (Column toCol: toSchema.getColumns()) {
- Column fromCol = colNames.get(toCol.getName());
-
- if (fromCol != null) {
- int fromIndex = fromSchema.getColumns().indexOf(fromCol);
- if (isNull(fields[fromIndex])) {
- out[toIndex] = null;
- } else {
- out[toIndex] = fields[fromIndex];
- }
- } else {
- //column exists in TO schema but not in FROM schema
- if (toCol.getNullable() == false) {
- throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
- } else {
- LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
- out[toIndex] = null;
- }
- }
-
- toIndex++;
- }
-
- return out;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
new file mode 100644
index 0000000..58b709e
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
+
+
+/**
+ * Convert data according to FROM schema to data according to TO schema
+ * This is done based on column location
+ * So data in first column in FROM goes into first column in TO, etc
+ * If TO schema has more fields and they are "nullable", the value will be set to null
+ * If TO schema has extra non-null fields, we'll throw an exception
+ */
+public class LocationMatcher extends Matcher {
+
+ public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
+
+ public LocationMatcher(Schema from, Schema to) {
+ super(from, to);
+ }
+
+ @Override
+ public Object[] getMatchingData(Object[] fields) {
+
+ Object[] out = new Object[getToSchema().getColumns().size()];
+
+ int i = 0;
+
+ if (getToSchema().isEmpty()) {
+ // If there's no destination schema, no need to convert anything
+ // Just use the original data
+ return fields;
+ }
+
+ for (Column col: getToSchema().getColumns()) {
+ if (i < fields.length) {
+ if (isNull(fields[i])) {
+ out[i] = null;
+ } else {
+ out[i] = fields[i];
+ }
+ }
+ // We ran out of fields before we ran out of schema
+ else {
+ if (!col.getNullable()) {
+ throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
+ } else {
+ LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
+ out[i] = null;
+ }
+ }
+ i++;
+ }
+ return out;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
new file mode 100644
index 0000000..8ab1318
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+
+public abstract class Matcher {
+
+ private final Schema fromSchema;
+ private final Schema toSchema;
+
+ public Matcher(Schema fromSchema, Schema toSchema) {
+ if (fromSchema.isEmpty() && toSchema.isEmpty()) {
+ throw new SqoopException(MatcherError.MATCHER_0000, "Neither a FROM or TO schemas been provided.");
+ } else if (toSchema.isEmpty()) {
+ this.fromSchema = fromSchema;
+ this.toSchema = fromSchema;
+ } else if (fromSchema.isEmpty()) {
+ this.fromSchema = toSchema;
+ this.toSchema = toSchema;
+ } else {
+ this.fromSchema = fromSchema;
+ this.toSchema = toSchema;
+ }
+ }
+
+ /**
+ *
+ * @param fields
+ * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
+ * Right not "converted" means re-ordering if needed and handling nulls.
+ */
+ abstract public Object[] getMatchingData(Object[] fields);
+
+ public Schema getFromSchema() {
+ return fromSchema;
+ }
+
+ public Schema getToSchema() {
+ return toSchema;
+ }
+
+ protected boolean isNull(Object value) {
+ if (value == null || value.equals("NULL")
+ || value.equals("null") || value.equals("'null'")
+ || value.equals("")) {
+ return true;
+ }
+ return false;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java
new file mode 100644
index 0000000..577b091
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum MatcherError implements ErrorCode {
+ MATCHER_0000("To few Schemas provided."),
+
+ ;
+
+ private final String message;
+
+ private MatcherError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java
new file mode 100644
index 0000000..ae89e6c
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.schema.Schema;
+
+public class MatcherFactory {
+ public static Matcher getMatcher(Schema fromSchema, Schema toSchema) {
+ if (toSchema.isEmpty() || fromSchema.isEmpty()) {
+ return new LocationMatcher(fromSchema, toSchema);
+ } else {
+ return new NameMatcher(fromSchema, toSchema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
new file mode 100644
index 0000000..69d5ebd
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
+
+import java.util.HashMap;
+
+public class NameMatcher extends Matcher {
+
+ public static final Logger LOG = Logger.getLogger(NameMatcher.class);
+
+ public NameMatcher(Schema from, Schema to) {
+ super(from, to);
+ }
+
+ @Override
+ public Object[] getMatchingData(Object[] fields) {
+ Object[] out = new Object[getToSchema().getColumns().size()];
+
+ HashMap<String,Column> colNames = new HashMap<String, Column>();
+
+ for (Column fromCol: getFromSchema().getColumns()) {
+ colNames.put(fromCol.getName(), fromCol);
+ }
+
+ int toIndex = 0;
+
+ for (Column toCol: getToSchema().getColumns()) {
+ Column fromCol = colNames.get(toCol.getName());
+
+ if (fromCol != null) {
+ int fromIndex = getFromSchema().getColumns().indexOf(fromCol);
+ if (isNull(fields[fromIndex])) {
+ out[toIndex] = null;
+ } else {
+ out[toIndex] = fields[fromIndex];
+ }
+ } else {
+ //column exists in TO schema but not in FROM schema
+ if (toCol.getNullable() == false) {
+ throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
+ } else {
+ LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
+ out[toIndex] = null;
+ }
+ }
+
+ toIndex++;
+ }
+
+ return out;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 3954039..765bedd 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -20,7 +20,6 @@ package org.apache.sqoop.connector.idf;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
@@ -40,8 +39,6 @@ public class TestCSVIntermediateDataFormat {
private IntermediateDataFormat<?> data;
- private Schema emptySchema = new Schema("empty");
-
@Before
public void setUp() {
data = new CSVIntermediateDataFormat();
@@ -73,7 +70,7 @@ public class TestCSVIntermediateDataFormat {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
- data.setFromSchema(schema);
+ data.setSchema(schema);
data.setTextData(null);
Object[] out = data.getObjectData();
@@ -90,7 +87,7 @@ public class TestCSVIntermediateDataFormat {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
- data.setFromSchema(schema);
+ data.setSchema(schema);
data.setTextData("");
data.getObjectData();
@@ -110,8 +107,7 @@ public class TestCSVIntermediateDataFormat {
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
- data.setFromSchema(schema);
- data.setToSchema(emptySchema);
+ data.setSchema(schema);
data.setTextData(testData);
Object[] out = data.getObjectData();
@@ -120,7 +116,7 @@ public class TestCSVIntermediateDataFormat {
assertEquals(new Long(34),out[1]);
assertEquals("54",out[2]);
assertEquals("random data",out[3]);
- assertEquals(-112, ((byte[])out[4])[0]);
+ assertEquals(-112, ((byte[]) out[4])[0]);
assertEquals(54, ((byte[])out[4])[1]);
assertEquals("\n", out[5].toString());
}
@@ -134,7 +130,7 @@ public class TestCSVIntermediateDataFormat {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
- data.setFromSchema(schema);
+ data.setSchema(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[6];
@@ -164,8 +160,7 @@ public class TestCSVIntermediateDataFormat {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
- data.setFromSchema(schema);
- data.setToSchema(emptySchema);
+ data.setSchema(schema);
Object[] in = new Object[6];
in[0] = new Long(10);
@@ -188,8 +183,7 @@ public class TestCSVIntermediateDataFormat {
Schema schema = new Schema("test");
schema.addColumn(new Text("1"));
- data.setFromSchema(schema);
- data.setToSchema(emptySchema);
+ data.setSchema(schema);
char[] allCharArr = new char[256];
for(int i = 0; i < allCharArr.length; ++i) {
@@ -212,148 +206,30 @@ public class TestCSVIntermediateDataFormat {
public void testByteArrayFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Binary("1"));
- data.setFromSchema(schema);
- data.setToSchema(emptySchema);
+ data.setSchema(schema);
byte[] allCharByteArr = new byte[256];
- for(int i = 0; i < allCharByteArr.length; ++i) {
- allCharByteArr[i] = (byte)i;
+ for (int i = 0; i < allCharByteArr.length; ++i) {
+ allCharByteArr[i] = (byte) i;
}
Object[] in = {allCharByteArr};
Object[] inCopy = new Object[1];
- System.arraycopy(in,0,inCopy,0,in.length);
+ System.arraycopy(in, 0, inCopy, 0, in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
}
- /**
- * Note that we don't have an EmptyTo matching test
- * Because most tests above have empty "to" schema
- */
- @Test
- public void testMatchingEmptyFrom() {
-
- data.setFromSchema(emptySchema);
-
- Schema toSchema = new Schema("To");
- toSchema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"));
- data.setToSchema(toSchema);
-
- Object[] in = new Object[2];
- in[0] = new Long(10);
- in[1] = new Long(34);
-
- Object[] out = new Object[2];
- out[0] = new Long(10);
- out[1] = new Long(34);
-
- data.setObjectData(in);
-
- assertTrue(Arrays.deepEquals(out, data.getObjectData()));
- }
-
@Test(expected=SqoopException.class)
- public void testMatchingTwoEmptySchema() {
- data.setFromSchema(emptySchema);
- data.setToSchema(emptySchema);
-
- Object[] in = new Object[2];
- in[0] = new Long(10);
- in[1] = new Long(34);
-
- data.setObjectData(in);
-
- data.getObjectData();
- }
-
- @Test
- public void testMatchingFewerFromColumns(){
- Schema fromSchema = new Schema("From");
- fromSchema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"));
- data.setFromSchema(fromSchema);
-
- Schema toSchema = new Schema("To");
- toSchema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new Text("3"));
- data.setToSchema(toSchema);
-
- Object[] in = new Object[2];
- in[0] = new Long(10);
- in[1] = new Long(34);
-
- Object[] out = new Object[3];
- out[0] = new Long(10);
- out[1] = new Long(34);
- out[2] = null;
-
- data.setObjectData(in);
-
- assertTrue(Arrays.deepEquals(out, data.getObjectData()));
- }
-
- @Test
- public void testMatchingFewerToColumns(){
- Schema fromSchema = new Schema("From");
- fromSchema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new FixedPoint("3"));
- data.setFromSchema(fromSchema);
-
- Schema toSchema = new Schema("To");
- toSchema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"));
- data.setToSchema(toSchema);
-
- Object[] in = new Object[3];
- in[0] = new Long(10);
- in[1] = new Long(34);
- in[2] = new Long(50);
-
- Object[] out = new Object[2];
- out[0] = new Long(10);
- out[1] = new Long(34);
-
-
- data.setObjectData(in);
-
- assertTrue(Arrays.deepEquals(out, data.getObjectData()));
- }
-
-
- @Test
- public void testWithSomeNonMatchingFields(){
-
- Schema fromSchema = new Schema("From");
- fromSchema.addColumn(new FixedPoint("1"))
- .addColumn(new FixedPoint("2"))
- .addColumn(new FixedPoint("3"));
- data.setFromSchema(fromSchema);
-
- Schema toSchema = new Schema("From");
- toSchema.addColumn(new FixedPoint("2"))
- .addColumn(new FixedPoint("3"))
- .addColumn(new FixedPoint("4"));
- data.setToSchema(toSchema);
-
- Object[] in = new Object[3];
- in[0] = new Long(10);
- in[1] = new Long(34);
- in[2] = new Long(50);
-
- Object[] out = new Object[3];
- out[0] = new Long(34);
- out[1] = new Long(50);
- out[2] = null;
-
- data.setObjectData(in);
+ public void testEmptySchema() {
+ String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ + ",'\\n'";
+ Schema schema = new Schema("Test");
+ data.setSchema(schema);
+ data.setTextData(testData);
- assertTrue(Arrays.deepEquals(out, data.getObjectData()));
+ Object[] out = data.getObjectData();
}
-
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 8c88d52..03d84d4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -29,13 +29,14 @@ import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.matcher.Matcher;
+import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
-import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;
@@ -54,8 +55,9 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
* Service for reporting progress to mapreduce.
*/
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
- private IntermediateDataFormat<String> dataFormat = null;
- private SqoopWritable dataOut = null;
+ private IntermediateDataFormat<String> fromDataFormat = null;
+ private IntermediateDataFormat<String> toDataFormat = null;
+ private Matcher matcher;
@Override
public void run(Context context) throws IOException, InterruptedException {
@@ -64,19 +66,17 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
-
-
- Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
- Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
+ matcher = MatcherFactory.getMatcher(
+ ConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
+ ConfigurationUtils.getConnectorSchema(Direction.TO, conf));
String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
- dataFormat = (IntermediateDataFormat<String>) ClassUtils
+ fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
-
- dataFormat.setFromSchema(fromSchema);
- dataFormat.setToSchema(toSchema);
-
- dataOut = new SqoopWritable();
+ fromDataFormat.setSchema(matcher.getFromSchema());
+ toDataFormat = (IntermediateDataFormat<String>) ClassUtils
+ .instantiate(intermediateDataFormatName);
+ toDataFormat.setSchema(matcher.getToSchema());
// Objects that should be passed to the Executor execution
PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
@@ -109,36 +109,41 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
private class SqoopMapDataWriter extends DataWriter {
private Context context;
+ private SqoopWritable writable;
public SqoopMapDataWriter(Context context) {
this.context = context;
+ this.writable = new SqoopWritable();
}
@Override
public void writeArrayRecord(Object[] array) {
- dataFormat.setObjectData(array);
+ fromDataFormat.setObjectData(array);
writeContent();
}
@Override
public void writeStringRecord(String text) {
- dataFormat.setTextData(text);
+ fromDataFormat.setTextData(text);
writeContent();
}
@Override
public void writeRecord(Object obj) {
- dataFormat.setData(obj.toString());
+ fromDataFormat.setData(obj.toString());
writeContent();
}
private void writeContent() {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("Extracted data: " + dataFormat.getTextData());
+ LOG.debug("Extracted data: " + fromDataFormat.getTextData());
}
- dataOut.setString(dataFormat.getTextData());
- context.write(dataOut, NullWritable.get());
+
+ toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) );
+
+ writable.setString(toDataFormat.getTextData());
+ context.write(writable, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 941b31d..1ebd3e4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -34,6 +34,8 @@ import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.matcher.Matcher;
+import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.common.PrefixContext;
@@ -52,6 +54,7 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false;
private volatile IntermediateDataFormat<String> dataFormat;
+ private Matcher matcher;
private JobContext context;
private SqoopRecordWriter writer;
private Future<?> consumerFuture;
@@ -65,19 +68,18 @@ public class SqoopOutputFormatLoadExecutor {
this.loaderName = loaderName;
dataFormat = new CSVIntermediateDataFormat();
writer = new SqoopRecordWriter();
+ matcher = null;
}
public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
context = jobctx;
writer = new SqoopRecordWriter();
+ matcher = MatcherFactory.getMatcher(
+ ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
+ ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
- .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
-
- Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
- dataFormat.setFromSchema(fromSchema);
-
- Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
- dataFormat.setToSchema(toSchema);
+ .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
+ dataFormat.setSchema(matcher.getToSchema());
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
index b5435ff..1952cbb 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -36,14 +36,7 @@ import org.apache.sqoop.job.mr.SqoopSplit;
public class JobUtils {
- public static void runJob(Configuration conf)
- throws IOException, InterruptedException, ClassNotFoundException {
- runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
- (conf.get(JobConstants.HADOOP_OUTDIR) != null) ?
- SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
- }
-
- public static void runJob(Configuration conf,
+ public static boolean runJob(Configuration conf,
Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
@@ -57,8 +50,7 @@ public class JobUtils {
job.setOutputKeyClass(SqoopWritable.class);
job.setOutputValueClass(NullWritable.class);
- boolean success = job.waitForCompletion(true);
- Assert.assertEquals("Job failed!", true, success);
+ return job.waitForCompletion(true);
}
private JobUtils() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 5662120..032cc11 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -54,6 +54,7 @@ import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -96,8 +97,10 @@ public class TestMapReduce {
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
- JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
- DummyOutputFormat.class);
+ ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+ boolean success = JobUtils.runJob(job.getConfiguration(),
+ SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
+ Assert.assertEquals("Job failed!", true, success);
}
@Test
@@ -116,8 +119,11 @@ public class TestMapReduce {
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
- JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
+ ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+ boolean success = JobUtils.runJob(job.getConfiguration(),
+ SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
+ Assert.assertEquals("Job failed!", true, success);
// Make sure both destroyers get called.
assertEquals(1, DummyFromDestroyer.count);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
new file mode 100644
index 0000000..7f9a147
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+
+@RunWith(Parameterized.class)
+public class TestMatching {
+ private static final int START_PARTITION = 1;
+ private static final int NUMBER_OF_PARTITIONS = 1;
+ private static final int NUMBER_OF_ROWS_PER_PARTITION = 1;
+
+ private Schema from;
+ private Schema to;
+
+ public TestMatching(Schema from,
+ Schema to)
+ throws Exception {
+ this.from = from;
+ this.to = to;
+
+ System.out.println("Testing with Schemas\n\tFROM: " + this.from + "\n\tTO: " + this.to);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> parameters = new ArrayList<Object[]>();
+
+ Schema emptyFrom = new Schema("FROM-EMPTY");
+ Schema emptyTo = new Schema("TO-EMPTY");
+ Schema from1 = new Schema("FROM-1");
+ Schema to1 = new Schema("TO-1");
+ Schema from2 = new Schema("FROM-2");
+ Schema to2 = new Schema("TO-2");
+
+ from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+ to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+ from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
+ to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
+
+ parameters.add(new Object[]{
+ emptyFrom,
+ emptyTo
+ });
+ parameters.add(new Object[]{
+ from1,
+ emptyTo
+ });
+ parameters.add(new Object[]{
+ emptyTo,
+ to1
+ });
+ parameters.add(new Object[]{
+ from1,
+ to1
+ });
+ parameters.add(new Object[]{
+ from2,
+ to1
+ });
+ parameters.add(new Object[]{
+ from1,
+ to2
+ });
+
+ return parameters;
+ }
+
+ @Test
+ public void testSchemaMatching() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
+
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
+ ConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
+ JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
+ DummyOutputFormat.class);
+ boolean success = JobUtils.runJob(job.getConfiguration(),
+ SqoopInputFormat.class, SqoopMapper.class,
+ DummyOutputFormat.class);
+ if (from.getName().split("-")[1].equals("EMPTY")) {
+ if (to.getName().split("-")[1].equals("EMPTY")) {
+ Assert.assertEquals("Job succeeded!", false, success);
+ } else {
+ Assert.assertEquals("Job failed!", true, success);
+ }
+ } else {
+ if (to.getName().split("-")[1].equals("EMPTY")) {
+ Assert.assertEquals("Job failed!", true, success);
+ } else if (from.getName().split("-")[1].equals(to.getName().split("-")[1])) {
+ Assert.assertEquals("Job failed!", true, success);
+ } else {
+ Assert.assertEquals("Job succeeded!", false, success);
+ }
+ }
+ }
+
+ public static class DummyPartition extends Partition {
+ private int id;
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+
+ @Override
+ public String toString() {
+ return Integer.toString(id);
+ }
+ }
+
+ public static class DummyPartitioner extends Partitioner {
+ @Override
+ public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+ DummyPartition partition = new DummyPartition();
+ partition.setId(id);
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+ }
+
+ public static class DummyExtractor extends Extractor {
+ @Override
+ public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
+ int id = ((DummyPartition)partition).getId();
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+ context.getDataWriter().writeArrayRecord(new Object[] {
+ id * NUMBER_OF_ROWS_PER_PARTITION + row,
+ (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+ }
+ }
+
+ @Override
+ public long getRowsRead() {
+ return NUMBER_OF_ROWS_PER_PARTITION;
+ }
+ }
+
+ public static class DummyOutputFormat
+ extends OutputFormat<SqoopWritable, NullWritable> {
+ @Override
+ public void checkOutputSpecs(JobContext context) {
+ // do nothing
+ }
+
+ @Override
+ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
+ TaskAttemptContext context) {
+ return new DummyRecordWriter();
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ return new DummyOutputCommitter();
+ }
+
+ public static class DummyRecordWriter
+ extends RecordWriter<SqoopWritable, NullWritable> {
+ private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+ private Data data = new Data();
+
+ @Override
+ public void write(SqoopWritable key, NullWritable value) {
+
+ data.setContent(new Object[] {
+ index,
+ (double) index,
+ String.valueOf(index)},
+ Data.ARRAY_RECORD);
+ index++;
+
+ assertEquals(data.toString(), key.toString());
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) {
+ // do nothing
+ }
+ }
+
+ public static class DummyOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) { }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+ }
+ }
+}