You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/04 06:31:21 UTC

sqoop git commit: SQOOP-2797: Sqoop2: Add new schema object for the Blob

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 7e092f5bf -> cea627fa1


SQOOP-2797: Sqoop2: Add new schema object for the Blob

(Colin Ma via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cea627fa
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cea627fa
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cea627fa

Branch: refs/heads/sqoop2
Commit: cea627fa1465a4a900c8ad02934df568f93f45b1
Parents: 7e092f5
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Feb 3 21:30:57 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Feb 3 21:30:57 2016 -0800

----------------------------------------------------------------------
 .../common/test/asserts/ProviderAsserts.java    | 16 ++++++--
 .../sqoop/common/test/db/DatabaseProvider.java  | 18 ++++++---
 .../sqoop/common/test/db/DerbyProvider.java     |  2 +-
 .../common/test/db/types/DerbyTypeList.java     |  9 ++++-
 .../sqoop/json/util/SchemaSerialization.java    |  4 ++
 .../java/org/apache/sqoop/schema/type/Blob.java | 39 ++++++++++++++++++++
 .../apache/sqoop/schema/type/ColumnType.java    |  1 +
 .../connector/jdbc/GenericJdbcExtractor.java    |  8 +++-
 .../connector/jdbc/util/SqlTypesUtils.java      |  5 ++-
 .../sqoop/connector/common/SqoopIDFUtils.java   |  2 +
 10 files changed, 90 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
index ae1b60d..e9cea2d 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
@@ -23,8 +23,8 @@ import org.apache.sqoop.common.test.db.TableName;
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.Blob;
 import java.sql.SQLException;
-import java.sql.Statement;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
@@ -37,6 +37,7 @@ import static org.testng.Assert.fail;
 public class ProviderAsserts {
 
   private static final Logger LOG = Logger.getLogger(ProviderAsserts.class);
+  private static final String ERROR_MESSAGE_PRFIX = "Columns do not match on position: ";
 
   /**
    * Assert row in the table.
@@ -59,8 +60,11 @@ public class ProviderAsserts {
         if (expectedValue == null) {
           assertNull(actualValue);
         } else {
-          assertEquals(expectedValue.toString(), actualValue.toString(),
-            "Columns do not match on position: " + i);
+          if (expectedValue instanceof Blob) {
+            assertBlob(rs.getBlob(i), (Blob) expectedValue, i);
+          } else {
+            assertEquals(expectedValue.toString(), actualValue.toString(), ERROR_MESSAGE_PRFIX + i);
+          }
         }
         i++;
       }
@@ -74,6 +78,12 @@ public class ProviderAsserts {
     }
   }
 
+  private static void assertBlob(Blob actualValue, Blob expectedValue, int colPosition) throws SQLException {
+    byte[] actual = actualValue.getBytes(1, (int)actualValue.length());
+    byte[] expected = expectedValue.getBytes(1, (int)expectedValue.length());
+    assertEquals(actual, expected, ERROR_MESSAGE_PRFIX + colPosition);
+  }
+
   private ProviderAsserts() {
     // Instantiation is prohibited
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
index afc5016..f3efa92 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
@@ -24,7 +24,6 @@ import org.apache.sqoop.common.test.db.types.DefaultTypeList;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -32,6 +31,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.sql.Blob;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -157,7 +157,7 @@ abstract public class DatabaseProvider {
    *
    * @return
    */
-  public DatabaseTypeList getDatabaseTypes() {
+  public DatabaseTypeList getDatabaseTypes() throws Exception {
     return new DefaultTypeList();
   }
 
@@ -304,7 +304,9 @@ abstract public class DatabaseProvider {
    * Return rows that match given conditions.
    *
    * @param tableName Table name
-   * @param conditions Conditions in form of double values - column name and value, for example: "id", 1 or "last_update_date", null
+   * @param conditions Conditions in form of double values - column name and value, for example:
+   *                   "id", 1 or "last_update_date", null.
+   *                   For Blob data type, it can't be used as a condition in where clause directly, skip it.
    * @return PreparedStatement representing the requested query
    */
   public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] conditions) {
@@ -326,9 +328,10 @@ abstract public class DatabaseProvider {
           throw new RuntimeException("Each odd item should be a string with column name.");
         }
 
-        if(value == null) {
+        // Blob can't be used in where clause directly, skip the where clause for Blob
+        if (value == null) {
           conditionList.add(escapeColumnName((String) columnName) + " IS NULL");
-        } else {
+        } else if (! (value instanceof Blob)) {
           conditionList.add(escapeColumnName((String) columnName) + " = ?");
         }
       }
@@ -340,7 +343,8 @@ abstract public class DatabaseProvider {
       PreparedStatement preparedStatement = getConnection().prepareStatement(sb.toString());
       for(int i = 1; i < conditions.length; i += 2) {
         Object value = conditions[i];
-        if (value != null) {
+        // skip the Blob data type
+        if (value != null && ! (value instanceof Blob)) {
           insertObjectIntoPreparedStatement(preparedStatement, i, value);
         }
       }
@@ -374,6 +378,8 @@ abstract public class DatabaseProvider {
       preparedStatement.setTimestamp(parameterIndex, (Timestamp) value);
     } else if (value instanceof BigDecimal) {
       preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value);
+    } else if (value instanceof Blob) {
+      preparedStatement.setBlob(parameterIndex, (Blob) value);
     } else {
       preparedStatement.setObject(parameterIndex, value);
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java
index 8f3e434..839e561 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java
@@ -169,7 +169,7 @@ public class DerbyProvider extends DatabaseProvider {
   }
 
   @Override
-  public DatabaseTypeList getDatabaseTypes() {
+  public DatabaseTypeList getDatabaseTypes() throws Exception {
     return new DerbyTypeList();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java
index 642651d..fc02b83 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java
@@ -17,14 +17,17 @@
  */
 package org.apache.sqoop.common.test.db.types;
 
+import javax.sql.rowset.serial.SerialBlob;
+import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.sql.SQLException;
 
 /**
  * Source: https://db.apache.org/derby/docs/10.7/ref/crefsqlj31068.html
  */
 public class DerbyTypeList extends DatabaseTypeList {
-  public DerbyTypeList() {
+  public DerbyTypeList() throws SQLException, UnsupportedEncodingException {
     super();
 
     // Numeric types
@@ -106,6 +109,10 @@ public class DerbyTypeList extends DatabaseTypeList {
       .build());
 
     // BLOB
+    add(DatabaseType.builder("BLOB(1K)")
+      .addExample("", new SerialBlob("test data".getBytes("ISO-8859-1")), "'test data'")
+      .build());
+
     // CLOB
     // Time
     // Timestamp

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
index 3a3f9e8..ee385c0 100644
--- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
+++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
@@ -31,6 +31,7 @@ import org.apache.sqoop.schema.type.AbstractString;
 import org.apache.sqoop.schema.type.Array;
 import org.apache.sqoop.schema.type.Binary;
 import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Blob;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
 import org.apache.sqoop.schema.type.Date;
@@ -238,6 +239,9 @@ public class SchemaSerialization {
     case BIT:
       output = new Bit(name);
       break;
+    case BLOB:
+      output = new Blob(name);
+      break;
     case DATE:
       output = new Date(name);
       break;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common/src/main/java/org/apache/sqoop/schema/type/Blob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Blob.java b/common/src/main/java/org/apache/sqoop/schema/type/Blob.java
new file mode 100644
index 0000000..17d5e6b
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/schema/type/Blob.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.schema.type;
+
+public class Blob extends Binary {
+
+  public Blob(String name) {
+    super(name);
+  }
+
+  @Override
+  public ColumnType getType() {
+    return ColumnType.BLOB;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder("Blob{")
+            .append(super.toString())
+            .append("}")
+            .toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java b/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java
index 9e415bf..ac98ee8 100644
--- a/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java
+++ b/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java
@@ -29,6 +29,7 @@ public enum ColumnType {
   ARRAY,
   BINARY,
   BIT,
+  BLOB,
   DATE,
   DATE_TIME,
   DECIMAL,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
index 0235f28..41af177 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
@@ -21,7 +21,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.Blob;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
@@ -68,6 +68,11 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
           // check type of the column
           Column schemaColumn = schemaColumns[i];
           switch (schemaColumn.getType()) {
+          case BLOB:
+            // convert the blob to byte[]
+            Blob blob = resultSet.getBlob(i + 1);
+            array[i] = blob.getBytes(1, (int)blob.length());
+            break;
           case DATE:
             // convert the sql date to JODA time as prescribed the Sqoop IDF spec
             array[i] = LocalDate.fromDateFields(resultSet.getDate(i + 1));
@@ -83,7 +88,6 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
           default:
             //for anything else
             array[i] = resultSet.getObject(i + 1);
-
           }
         }
         context.getDataWriter().writeArrayRecord(array);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
index a6ffa7c..f8f9f0d 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.jdbc.util;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.Binary;
 import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Blob;
 import org.apache.sqoop.schema.type.Date;
 import org.apache.sqoop.schema.type.DateTime;
 import org.apache.sqoop.schema.type.Decimal;
@@ -88,9 +89,11 @@ public class SqlTypesUtils {
       case Types.BOOLEAN:
         return new Bit(columnName);
 
+      case Types.BLOB:
+        return new Blob(columnName);
+
       case Types.BINARY:
       case Types.VARBINARY:
-      case Types.BLOB:
       case Types.LONGVARBINARY:
         return new Binary(columnName);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index fc25100..9baa743 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -608,6 +608,7 @@ public class SqoopIDFUtils {
             csvString.append(toCSVString(objectArray[i].toString()));
             break;
           case BINARY:
+          case BLOB:
           case UNKNOWN:
             csvString.append(toCSVByteArray((byte[]) objectArray[i]));
             break;
@@ -714,6 +715,7 @@ public class SqoopIDFUtils {
         returnValue = toText(csvString);
         break;
       case BINARY:
+      case BLOB:
         // Unknown is treated as a binary type
       case UNKNOWN:
         returnValue = toByteArray(csvString);