You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/01/07 12:13:15 UTC

sqoop git commit: SQOOP-1968: Optimize schema operation in getMatchingData of NameMatcher

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 dc50e4074 -> 60066b8f3


SQOOP-1968: Optimize schema operation in getMatchingData of NameMatcher

(Qian Xu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/60066b8f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/60066b8f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/60066b8f

Branch: refs/heads/sqoop2
Commit: 60066b8f3873cd322735a7b822b9eac16e24c0fc
Parents: dc50e40
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jan 7 12:12:26 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jan 7 12:13:06 2015 +0100

----------------------------------------------------------------------
 .../connector/matcher/LocationMatcher.java      |  46 +++-----
 .../apache/sqoop/connector/matcher/Matcher.java |  15 +++
 .../sqoop/connector/matcher/NameMatcher.java    |  67 +++++------
 .../sqoop/connector/matcher/SchemaFixture.java  |  61 ++++++++++
 .../connector/matcher/TestLocationMatcher.java  | 110 ++++++++++++++++++
 .../connector/matcher/TestNameMatcher.java      | 116 +++++++++++++++++++
 6 files changed, 350 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
index 01adaf0..d92723e 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
@@ -17,62 +17,44 @@
  */
 package org.apache.sqoop.connector.matcher;
 
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
 import org.apache.sqoop.schema.type.Column;
 
-
 /**
- * Convert data according to FROM schema to data according to TO schema
- * This is done based on column location
- * So data in first column in FROM goes into first column in TO, etc
- * If TO schema has more fields and they are "nullable", the value will be set to null
- * If TO schema has extra non-null fields, we'll throw an exception
+ * Convert data according to FROM schema to data according to TO schema. This is
+ * done based on column location, Data in first column in FROM goes into first
+ * column in TO, etc., if TO schema has more fields and they are "nullable",
+ * their values will be set to null. If TO schema has extra non-null fields, we
+ * will throw an exception.
  */
 public class LocationMatcher extends Matcher {
 
-  public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
-
   public LocationMatcher(Schema from, Schema to) {
     super(from, to);
   }
 
   @Override
   public Object[] getMatchingData(Object[] fields) {
-
-    Object[] out = new Object[getToSchema().getColumnsCount()];
-
-    int i = 0;
-
     if (getToSchema().isEmpty()) {
-      // If there's no destination schema, no need to convert anything
-      // Just use the original data
+      // No destination schema found. No need to convert anything.
       return fields;
     }
 
-    for (Column col: getToSchema().getColumnsArray()) {
+    Object[] out = new Object[getToSchema().getColumnsCount()];
+    int i = 0;
+
+    for (Column col : getToSchema().getColumnsList()) {
       if (i < fields.length) {
-        if (isNull(fields[i])) {
-          out[i] = null;
-        } else {
-          out[i] = fields[i];
-        }
+        Object value = fields[i];
+        out[i] = isNull(value) ? null : value;
       }
       // We ran out of fields before we ran out of schema
       else {
-        if (!col.getNullable()) {
-          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
-        } else {
-          LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
-          out[i] = null;
-        }
+        tryFillNullInArrayForUnexpectedColumn(col, out, i);
       }
       i++;
     }
     return out;
   }
 
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
index 39e0007..554e415 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
@@ -17,12 +17,16 @@
  */
 package org.apache.sqoop.connector.matcher;
 
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.ByteArraySchema;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
 
 public abstract class Matcher {
 
+  private static final Logger LOG = Logger.getLogger(Matcher.class);
   private final Schema fromSchema;
   private final Schema toSchema;
 
@@ -67,5 +71,16 @@ public abstract class Matcher {
     return false;
   }
 
+  protected void tryFillNullInArrayForUnexpectedColumn(Column column,
+      Object[] array, int index) throws SqoopException {
+    if (!column.getNullable()) {
+      throw new SqoopException(SchemaError.SCHEMA_0004, "Target column " +
+          column + " didn't match with any source column and cannot be null.");
+    }
+
+    LOG.warn("Column " + column +
+        " has no matching source column. Will be ignored.");
+    array[index] = null;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
index c01b916..7cbc39f 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
@@ -17,58 +17,59 @@
  */
 package org.apache.sqoop.connector.matcher;
 
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
 import org.apache.sqoop.schema.type.Column;
 
 import java.util.HashMap;
 
+/**
+ * Convert data according to FROM schema to data according to TO schema. This is
+ * done based on column name, If TO schema has more fields and they are
+ * "nullable", their values will be set to null. If TO schema has extra non-null
+ * fields, we will throw an exception.
+ */
 public class NameMatcher extends Matcher {
 
-  public static final Logger LOG = Logger.getLogger(NameMatcher.class);
+  private HashMap<String, Integer> fromColNameIndexMap;
 
   public NameMatcher(Schema from, Schema to) {
     super(from, to);
+
+    fromColNameIndexMap = new HashMap<String, Integer>();
+    int fromIndex = 0;
+
+    for (Column fromCol : getFromSchema().getColumnsList()) {
+      fromColNameIndexMap.put(fromCol.getName(), fromIndex);
+      fromIndex++;
+    }
   }
 
   @Override
   public Object[] getMatchingData(Object[] fields) {
-    Object[] out = new Object[getToSchema().getColumnsCount()];
-
-    HashMap<String,Column> colNames = new HashMap<String, Column>();
-
-    for (Column fromCol: getFromSchema().getColumnsArray()) {
-      colNames.put(fromCol.getName(), fromCol);
+    if (getToSchema().isEmpty()) {
+      // No destination schema found. No need to convert anything.
+      return fields;
     }
 
-    int toIndex = 0;
-
-    for (Column toCol: getToSchema().getColumnsArray()) {
-      Column fromCol = colNames.get(toCol.getName());
+    Object[] out = new Object[getToSchema().getColumnsCount()];
+    int i = 0;
 
-      if (fromCol != null) {
-        int fromIndex = getFromSchema().getColumnsList().indexOf(fromCol);
-        if (isNull(fields[fromIndex])) {
-          out[toIndex] = null;
-        } else {
-          out[toIndex] = fields[fromIndex];
-        }
-      } else {
-        //column exists in TO schema but not in FROM schema
-        if (toCol.getNullable() == false) {
-          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
-        } else {
-          LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
-          out[toIndex] = null;
+    for (Column toCol : getToSchema().getColumnsList()) {
+      boolean assigned = false;
+      if (fromColNameIndexMap.containsKey(toCol.getName())) {
+        int fromIndex = fromColNameIndexMap.get(toCol.getName());
+        if (fromIndex < fields.length) {
+          Object value = fields[fromIndex];
+          out[i] = isNull(value) ? null : value;
+          assigned = true;
         }
       }
-
-      toIndex++;
+      if (!assigned) {
+        tryFillNullInArrayForUnexpectedColumn(toCol, out, i);
+      }
+      i++;
     }
-
-  return out;
+    return out;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java
new file mode 100644
index 0000000..f20b851
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/SchemaFixture.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDateTime;
+
+class SchemaFixture {
+
+  public static Schema createSchema1(String name) {
+    Schema schema = new Schema(name);
+    schema.addColumn(new Text("text1"));
+    schema.addColumn(new DateTime("datetime1", false, false));
+    return schema;
+  }
+
+  public static Object[] createNotNullRecordForSchema1() {
+    Object[] fields = new Object[2];
+    fields[0] = "some text";
+    fields[1] = new LocalDateTime("2015-01-01");
+    return fields;
+  }
+
+  public static Schema createSchema(String name, String[] columnNames) {
+    Schema schema = new Schema(name);
+    for (String columnName : columnNames) {
+      if (columnName.startsWith("datetime")) {
+        schema.addColumn(new DateTime(columnName, false, false));
+      } else {
+        schema.addColumn(new Text(columnName));
+      }
+    }
+    return schema;
+  }
+
+  public static Schema createSchema(String name, int numOfColumn) {
+    Schema schema = new Schema(name);
+    for (int i = 0; i < numOfColumn; i++) {
+      schema.addColumn(new Text("text" + i));
+    }
+    return schema;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java
new file mode 100644
index 0000000..edf5fd9
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestLocationMatcher.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.NullSchema;
+import org.apache.sqoop.schema.Schema;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestLocationMatcher {
+
+  private LocationMatcher matcher;
+
+  @Before
+  public void setUp() {
+    matcher = null;
+  }
+
+  /**
+   * FROM and TO schemas are identical, fields should be copied directly.
+   */
+  @Test
+  public void testPerfectMatch() {
+    matcher = new LocationMatcher(
+        SchemaFixture.createSchema1("from"),
+        SchemaFixture.createSchema1("to"));
+    Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(fields, actual);
+  }
+
+  /**
+   * When no FROM schema is specified, fields should be copied directly.
+   */
+  @Test
+  public void testDirectFieldsCopy() {
+    matcher = new LocationMatcher(
+        NullSchema.getInstance(),
+        SchemaFixture.createSchema1("to"));
+    Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(fields, actual);
+  }
+
+  /**
+   * If a field contains any "nullable" value, it should be converted to null.
+   */
+  @Test
+  public void testNullableFieldConvert() {
+    matcher = new LocationMatcher(
+        SchemaFixture.createSchema("from", 5),
+        SchemaFixture.createSchema("to", 5));
+    Object[] fields = new Object[] {null, "NULL", "null", "'null'", ""};
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(new Object[] {null, null, null, null, null}, actual);
+  }
+
+  /**
+   * If TO schema has more fields than FROM schema, and all of the extra fields
+   * are "nullable", their values will be set to null.
+   */
+  @Test
+  public void testConvertWhenToSchemaIsLongerThanFromSchema() {
+    matcher = new LocationMatcher(
+        SchemaFixture.createSchema("from", 2),
+        SchemaFixture.createSchema("to", 3));
+    Object[] fields = new Object[] {"t1", "t2"};
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(new Object[] {"t1", "t2", null}, actual);
+  }
+
+  /**
+   * If TO schema has more fields than FROM schema, and NOT all of the extra
+   * fields are "nullable", a SqoopException is expected.
+   */
+  @Test (expected = SqoopException.class)
+  public void testConvertWhenToSchemaIsLongerThanFromSchemaFail() {
+    Schema from = SchemaFixture.createSchema("from", 2);
+    Schema to = SchemaFixture.createSchema("to", 4);
+    to.getColumnsList().get(2).setNullable(true);
+    to.getColumnsList().get(3).setNullable(false);
+    matcher = new LocationMatcher(from, to);
+    Object[] fields = new Object[] {"t1", "t2"};
+
+    matcher.getMatchingData(fields);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/60066b8f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java
new file mode 100644
index 0000000..8d5d720
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/matcher/TestNameMatcher.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.NullSchema;
+import org.apache.sqoop.schema.Schema;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestNameMatcher {
+
+  private NameMatcher matcher;
+
+  @Before
+  public void setUp() {
+    matcher = null;
+  }
+
+  /**
+   * FROM and TO schemas are identical, fields should be copied directly.
+   */
+  @Test
+  public void testPerfectMatch() {
+    matcher = new NameMatcher(
+        SchemaFixture.createSchema1("from"),
+        SchemaFixture.createSchema1("to"));
+    Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(fields, actual);
+  }
+
+  /**
+   * When no FROM schema is specified, fields should be copied directly.
+   */
+  @Test
+  public void testDirectFieldsCopy() {
+    matcher = new NameMatcher(
+        NullSchema.getInstance(),
+        SchemaFixture.createSchema1("to"));
+    Object[] fields = SchemaFixture.createNotNullRecordForSchema1();
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(fields, actual);
+  }
+
+  /**
+   * If a field contains any "nullable" value, it should be converted to null.
+   */
+  @Test
+  public void testNullableFieldConvert() {
+    matcher = new NameMatcher(
+        SchemaFixture.createSchema("from",
+            new String[]{"text1", "text2", "text3", "text4", "text5"}),
+        SchemaFixture.createSchema("to",
+            new String[]{"text5", "text4", "text2", "text3", "text1"}));
+    Object[] fields = new Object[] {null, "NULL", "null", "'null'", ""};
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(new Object[] {null, null, null, null, null}, actual);
+  }
+
+  /**
+   * If TO schema has more fields than FROM schema, and all of the extra fields
+   * are "nullable", their values will be set to null.
+   */
+  @Test
+  public void testConvertWhenToSchemaIsLongerThanFromSchema() {
+    matcher = new NameMatcher(
+        SchemaFixture.createSchema("from",
+            new String[]{"text1", "text2"}),
+        SchemaFixture.createSchema("to",
+            new String[]{"text3", "text1", "text2"}));
+    Object[] fields = new Object[] {"t1", "t2"};
+
+    Object[] actual = matcher.getMatchingData(fields);
+    assertArrayEquals(new Object[] {null, "t1", "t2"}, actual);
+  }
+
+  /**
+   * If TO schema has more fields than FROM schema, and NOT all of the extra
+   * fields are "nullable", a SqoopException is expected.
+   */
+  @Test (expected = SqoopException.class)
+  public void testConvertWhenToSchemaIsLongerThanFromSchemaFail() {
+    Schema from = SchemaFixture.createSchema("from",
+        new String[]{"text1", "text2"});
+    Schema to = SchemaFixture.createSchema("to",
+        new String[]{"text4", "text3", "text2", "text1"});
+    to.getColumnsList().get(0).setNullable(true);
+    to.getColumnsList().get(1).setNullable(false);
+    matcher = new NameMatcher(from, to);
+    Object[] fields = new Object[] {"t1", "t2"};
+
+    matcher.getMatchingData(fields);
+  }
+
+}
\ No newline at end of file