You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/01/07 12:13:15 UTC
sqoop git commit: SQOOP-1968: Optimize schema operation in
getMatchingData of NameMatcher
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 dc50e4074 -> 60066b8f3
SQOOP-1968: Optimize schema operation in getMatchingData of NameMatcher
(Qian Xu via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/60066b8f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/60066b8f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/60066b8f
Branch: refs/heads/sqoop2
Commit: 60066b8f3873cd322735a7b822b9eac16e24c0fc
Parents: dc50e40
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jan 7 12:12:26 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jan 7 12:13:06 2015 +0100
----------------------------------------------------------------------
.../connector/matcher/LocationMatcher.java | 46 +++-----
.../apache/sqoop/connector/matcher/Matcher.java | 15 +++
.../sqoop/connector/matcher/NameMatcher.java | 67 +++++------
.../sqoop/connector/matcher/SchemaFixture.java | 61 ++++++++++
.../connector/matcher/TestLocationMatcher.java | 110 ++++++++++++++++++
.../connector/matcher/TestNameMatcher.java | 116 +++++++++++++++++++
6 files changed, 350 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
index 01adaf0..d92723e 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
@@ -17,62 +17,44 @@
*/
package org.apache.sqoop.connector.matcher;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.type.Column;
-
/**
- * Convert data according to FROM schema to data according to TO schema
- * This is done based on column location
- * So data in first column in FROM goes into first column in TO, etc
- * If TO schema has more fields and they are "nullable", the value will be set to null
- * If TO schema has extra non-null fields, we'll throw an exception
+ * Convert data according to FROM schema to data according to TO schema. This is
+ * done based on column location, Data in first column in FROM goes into first
+ * column in TO, etc., if TO schema has more fields and they are "nullable",
+ * their values will be set to null. If TO schema has extra non-null fields, we
+ * will throw an exception.
*/
public class LocationMatcher extends Matcher {
- public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
-
public LocationMatcher(Schema from, Schema to) {
super(from, to);
}
@Override
public Object[] getMatchingData(Object[] fields) {
-
- Object[] out = new Object[getToSchema().getColumnsCount()];
-
- int i = 0;
-
if (getToSchema().isEmpty()) {
- // If there's no destination schema, no need to convert anything
- // Just use the original data
+ // No destination schema found. No need to convert anything.
return fields;
}
- for (Column col: getToSchema().getColumnsArray()) {
+ Object[] out = new Object[getToSchema().getColumnsCount()];
+ int i = 0;
+
+ for (Column col : getToSchema().getColumnsList()) {
if (i < fields.length) {
- if (isNull(fields[i])) {
- out[i] = null;
- } else {
- out[i] = fields[i];
- }
+ Object value = fields[i];
+ out[i] = isNull(value) ? null : value;
}
// We ran out of fields before we ran out of schema
else {
- if (!col.getNullable()) {
- throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
- } else {
- LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
- out[i] = null;
- }
+ tryFillNullInArrayForUnexpectedColumn(col, out, i);
}
i++;
}
return out;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
index 39e0007..554e415 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
@@ -17,12 +17,16 @@
*/
package org.apache.sqoop.connector.matcher;
+import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.ByteArraySchema;
import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
public abstract class Matcher {
+ private static final Logger LOG = Logger.getLogger(Matcher.class);
private final Schema fromSchema;
private final Schema toSchema;
@@ -67,5 +71,16 @@ public abstract class Matcher {
return false;
}
+ protected void tryFillNullInArrayForUnexpectedColumn(Column column,
+ Object[] array, int index) throws SqoopException {
+ if (!column.getNullable()) {
+ throw new SqoopException(SchemaError.SCHEMA_0004, "Target column " +
+ column + " didn't match with any source column and cannot be null.");
+ }
+
+ LOG.warn("Column " + column +
+ " has no matching source column. Will be ignored.");
+ array[index] = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
index c01b916..7cbc39f 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
@@ -17,58 +17,59 @@
*/
package org.apache.sqoop.connector.matcher;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.type.Column;
import java.util.HashMap;
+/**
+ * Convert data according to FROM schema to data according to TO schema. This is
+ * done based on column name, If TO schema has more fields and they are
+ * "nullable", their values will be set to null. If TO schema has extra non-null
+ * fields, we will throw an exception.
+ */
public class NameMatcher extends Matcher {
- public static final Logger LOG = Logger.getLogger(NameMatcher.class);
+ private HashMap<String, Integer> fromColNameIndexMap;
public NameMatcher(Schema from, Schema to) {
super(from, to);
+
+ fromColNameIndexMap = new HashMap<String, Integer>();
+ int fromIndex = 0;
+
+ for (Column fromCol : getFromSchema().getColumnsList()) {
+ fromColNameIndexMap.put(fromCol.getName(), fromIndex);
+ fromIndex++;
+ }
}
@Override
public Object[] getMatchingData(Object[] fields) {
- Object[] out = new Object[getToSchema().getColumnsCount()];
-
- HashMap<String,Column> colNames = new HashMap<String, Column>();
-
- for (Column fromCol: getFromSchema().getColumnsArray()) {
- colNames.put(fromCol.getName(), fromCol);
+ if (getToSchema().isEmpty()) {
+ // No destination schema found. No need to convert anything.
+ return fields;
}
- int toIndex = 0;
-
- for (Column toCol: getToSchema().getColumnsArray()) {
- Column fromCol = colNames.get(toCol.getName());
+ Object[] out = new Object[getToSchema().getColumnsCount()];
+ int i = 0;
- if (fromCol != null) {
- int fromIndex = getFromSchema().getColumnsList().indexOf(fromCol);
- if (isNull(fields[fromIndex])) {
- out[toIndex] = null;
- } else {
- out[toIndex] = fields[fromIndex];
- }
- } else {
- //column exists in TO schema but not in FROM schema
- if (toCol.getNullable() == false) {
- throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
- } else {
- LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
- out[toIndex] = null;
+ for (Column toCol : getToSchema().getColumnsList()) {
+ boolean assigned = false;
+ if (fromColNameIndexMap.containsKey(toCol.getName())) {
+ int fromIndex = fromColNameIndexMap.get(toCol.getName());
+ if (fromIndex < fields.length) {
+ Object value = fields[fromIndex];
+ out[i] = isNull(value) ? null : value;
+ assigned = true;
}
}
-
- toIndex++;
+ if (!assigned) {
+ tryFillNullInArrayForUnexpectedColumn(toCol, out, i);
+ }
+ i++;
}
-
- return out;
+ return out;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java
new file mode 100644
index 0000000..f20b851
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDateTime;
+
+class SchemaFixture {
+
+ public static Schema createSchema1(String name) {
+ Schema schema = new Schema(name);
+ schema.addColumn(new Text("text1"));
+ schema.addColumn(new DateTime("datetime1", false, false));
+ return schema;
+ }
+
+ public static Object[] createNotNullRecordForSchema1() {
+ Object[] fields = new Object[2];
+ fields[0] = "some text";
+ fields[1] = new LocalDateTime("2015-01-01");
+ return fields;
+ }
+
+ public static Schema createSchema(String name, String[] columnNames) {
+ Schema schema = new Schema(name);
+ for (String columnName : columnNames) {
+ if (columnName.startsWith("datetime")) {
+ schema.addColumn(new DateTime(columnName, false, false));
+ } else {
+ schema.addColumn(new Text(columnName));
+ }
+ }
+ return schema;
+ }
+
+ public static Schema createSchema(String name, int numOfColumn) {
+ Schema schema = new Schema(name);
+ for (int i = 0; i < numOfColumn; i++) {
+ schema.addColumn(new Text("text" + i));
+ }
+ return schema;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java
new file mode 100644
index 0000000..edf5fd9
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.NullSchema;
+import org.apache.sqoop.schema.Schema;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestLocationMatcher {
+
+ private LocationMatcher matcher;
+
+ @Before
+ public void setUp() {
+ matcher = null;
+ }
+
+ /**
+ * FROM and TO schemas are identical, fields should be copied directly.
+ */
+ @Test
+ public void testPerfectMatch() {
+ matcher = new LocationMatcher(
+ SchemaFixture.createSchema1("from"),
+ SchemaFixture.createSchema1("to"));
+ Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(fields, actual);
+ }
+
+ /**
+ * When no FROM schema is specified, fields should be copied directly.
+ */
+ @Test
+ public void testDirectFieldsCopy() {
+ matcher = new LocationMatcher(
+ NullSchema.getInstance(),
+ SchemaFixture.createSchema1("to"));
+ Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(fields, actual);
+ }
+
+ /**
+ * If a field contains any "nullable" value, it should be converted to null.
+ */
+ @Test
+ public void testNullableFieldConvert() {
+ matcher = new LocationMatcher(
+ SchemaFixture.createSchema("from", 5),
+ SchemaFixture.createSchema("to", 5));
+ Object[] fields = new Object[] {null, "NULL", "null", "'null'", ""};
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(new Object[] {null, null, null, null, null}, actual);
+ }
+
+ /**
+ * If TO schema has more fields than FROM schema, and all of the extra fields
+ * are "nullable", their values will be set to null.
+ */
+ @Test
+ public void testConvertWhenToSchemaIsLongerThanFromSchema() {
+ matcher = new LocationMatcher(
+ SchemaFixture.createSchema("from", 2),
+ SchemaFixture.createSchema("to", 3));
+ Object[] fields = new Object[] {"t1", "t2"};
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(new Object[] {"t1", "t2", null}, actual);
+ }
+
+ /**
+ * If TO schema has more fields than FROM schema, and NOT all of the extra
+ * fields are "nullable", a SqoopException is expected.
+ */
+ @Test (expected = SqoopException.class)
+ public void testConvertWhenToSchemaIsLongerThanFromSchemaFail() {
+ Schema from = SchemaFixture.createSchema("from", 2);
+ Schema to = SchemaFixture.createSchema("to", 4);
+ to.getColumnsList().get(2).setNullable(true);
+ to.getColumnsList().get(3).setNullable(false);
+ matcher = new LocationMatcher(from, to);
+ Object[] fields = new Object[] {"t1", "t2"};
+
+ matcher.getMatchingData(fields);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java
new file mode 100644
index 0000000..8d5d720
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.NullSchema;
+import org.apache.sqoop.schema.Schema;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestNameMatcher {
+
+ private NameMatcher matcher;
+
+ @Before
+ public void setUp() {
+ matcher = null;
+ }
+
+ /**
+ * FROM and TO schemas are identical, fields should be copied directly.
+ */
+ @Test
+ public void testPerfectMatch() {
+ matcher = new NameMatcher(
+ SchemaFixture.createSchema1("from"),
+ SchemaFixture.createSchema1("to"));
+ Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(fields, actual);
+ }
+
+ /**
+ * When no FROM schema is specified, fields should be copied directly.
+ */
+ @Test
+ public void testDirectFieldsCopy() {
+ matcher = new NameMatcher(
+ NullSchema.getInstance(),
+ SchemaFixture.createSchema1("to"));
+ Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(fields, actual);
+ }
+
+ /**
+ * If a field contains any "nullable" value, it should be converted to null.
+ */
+ @Test
+ public void testNullableFieldConvert() {
+ matcher = new NameMatcher(
+ SchemaFixture.createSchema("from",
+ new String[]{"text1", "text2", "text3", "text4", "text5"}),
+ SchemaFixture.createSchema("to",
+ new String[]{"text5", "text4", "text2", "text3", "text1"}));
+ Object[] fields = new Object[] {null, "NULL", "null", "'null'", ""};
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(new Object[] {null, null, null, null, null}, actual);
+ }
+
+ /**
+ * If TO schema has more fields than FROM schema, and all of the extra fields
+ * are "nullable", their values will be set to null.
+ */
+ @Test
+ public void testConvertWhenToSchemaIsLongerThanFromSchema() {
+ matcher = new NameMatcher(
+ SchemaFixture.createSchema("from",
+ new String[]{"text1", "text2"}),
+ SchemaFixture.createSchema("to",
+ new String[]{"text3", "text1", "text2"}));
+ Object[] fields = new Object[] {"t1", "t2"};
+
+ Object[] actual = matcher.getMatchingData(fields);
+ assertArrayEquals(new Object[] {null, "t1", "t2"}, actual);
+ }
+
+ /**
+ * If TO schema has more fields than FROM schema, and NOT all of the extra
+ * fields are "nullable", a SqoopException is expected.
+ */
+ @Test (expected = SqoopException.class)
+ public void testConvertWhenToSchemaIsLongerThanFromSchemaFail() {
+ Schema from = SchemaFixture.createSchema("from",
+ new String[]{"text1", "text2"});
+ Schema to = SchemaFixture.createSchema("to",
+ new String[]{"text4", "text3", "text2", "text1"});
+ to.getColumnsList().get(0).setNullable(true);
+ to.getColumnsList().get(1).setNullable(false);
+ matcher = new NameMatcher(from, to);
+ Object[] fields = new Object[] {"t1", "t2"};
+
+ matcher.getMatchingData(fields);
+ }
+
+}
\ No newline at end of file