You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/15 19:35:20 UTC

git commit: SQOOP-906: Sqoop is always calling ConnectionManager.datetimeToQueryString with TIMESTAMP column type

Updated Branches:
  refs/heads/trunk ab4bfb9a8 -> 549511378


SQOOP-906: Sqoop is always calling ConnectionManager.datetimeToQueryString with TIMESTAMP column type

(Raghav Kumar Gautam via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/54951137
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/54951137
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/54951137

Branch: refs/heads/trunk
Commit: 5495113781286de00473eb3d8c535f4288454082
Parents: ab4bfb9
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Jul 15 10:34:38 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Jul 15 10:34:38 2013 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/manager/ConnManager.java   |   5 +
 .../org/apache/sqoop/manager/OracleManager.java |   3 +
 src/java/org/apache/sqoop/tool/ImportTool.java  |   3 +-
 .../cloudera/sqoop/TestIncrementalImport.java   |   2 +-
 src/test/com/cloudera/sqoop/TestMerge.java      |   4 +-
 .../com/cloudera/sqoop/ThirdPartyTests.java     |   2 +
 .../sqoop/testutil/BaseSqoopTestCase.java       |   9 +-
 .../oracle/OracleIncrementalImportTest.java     | 187 +++++++++++++++++++
 8 files changed, 207 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index c84c859..f4b22f9 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -721,6 +721,11 @@ public abstract class ConnManager {
    * be inserted into a SQL statement, representing that date/time.
    */
   public String datetimeToQueryString(String datetime, int columnType) {
+    if (columnType != Types.TIMESTAMP && columnType != Types.DATE) {
+      String msg = "Column type is neither timestamp nor date!";
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
     return "'" + datetime + "'";
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/manager/OracleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java
index 686bc19..f6f3afa 100644
--- a/src/java/org/apache/sqoop/manager/OracleManager.java
+++ b/src/java/org/apache/sqoop/manager/OracleManager.java
@@ -593,6 +593,9 @@ public class OracleManager
     if (columnType == Types.TIMESTAMP) {
       return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
     } else if (columnType == Types.DATE) {
+      // converting timestamp of the form 2012-11-11 11:11:11.00 to
+      // date of the form 2011:11:11 11:11:11
+      datetime = datetime.split("\\.")[0];
       return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
     } else {
       String msg = "Column type is neither timestamp nor date!";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index cb800b6..fbbde1d 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -283,7 +283,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       }
       break;
     case DateLastModified:
-      checkColumnType = Types.TIMESTAMP;
+      checkColumnType = manager.getColumnTypes(options.getTableName(),
+        options.getSqlQuery()).get(options.getIncrementalTestColumn());
       nextVal = manager.getCurrentDbTimestamp();
       if (null == nextVal) {
         throw new IOException("Could not get current time from database");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/TestIncrementalImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
index 02080df..8eadcdd 100644
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -424,7 +424,7 @@ public class TestIncrementalImport extends TestCase {
       args.add("--incremental");
       args.add("lastmodified");
       args.add("--check-column");
-      args.add("last_modified");
+      args.add("LAST_MODIFIED");
     }
     args.add("--columns");
     args.add("id");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/TestMerge.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
index 5010cf2..cc1a3a9 100644
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ b/src/test/com/cloudera/sqoop/TestMerge.java
@@ -160,7 +160,7 @@ public class TestMerge extends BaseSqoopTestCase {
     // Do an import of this data into the "old" dataset.
     options.setTargetDir(new Path(warehouse, "merge-old").toString());
     options.setIncrementalMode(IncrementalMode.DateLastModified);
-    options.setIncrementalTestColumn("lastmod");
+    options.setIncrementalTestColumn("LASTMOD");
 
     ImportTool importTool = new ImportTool();
     Sqoop importer = new Sqoop(importTool, options.getConf(), options);
@@ -204,7 +204,7 @@ public class TestMerge extends BaseSqoopTestCase {
     options.setNumMappers(1);
     options.setTargetDir(new Path(warehouse, "merge-new").toString());
     options.setIncrementalMode(IncrementalMode.DateLastModified);
-    options.setIncrementalTestColumn("lastmod");
+    options.setIncrementalTestColumn("LASTMOD");
     options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
 
     importTool = new ImportTool();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/ThirdPartyTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
index ada5c72..b2db1b4 100644
--- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java
+++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
@@ -44,6 +44,7 @@ import com.cloudera.sqoop.manager.PostgresqlImportTest;
 
 import org.apache.sqoop.manager.mysql.MySqlCallExportTest;
 import org.apache.sqoop.manager.oracle.OracleCallExportTest;
+import org.apache.sqoop.manager.oracle.OracleIncrementalImportTest;
 import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportDelimitedFileManualTest;
 import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportSequenceFileManualTest;
 import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeImportDelimitedFileManualTest;
@@ -82,6 +83,7 @@ public final class ThirdPartyTests extends TestCase {
     suite.addTestSuite(OracleExportTest.class);
     suite.addTestSuite(OracleManagerTest.class);
     suite.addTestSuite(OracleCompatTest.class);
+    suite.addTestSuite(OracleIncrementalImportTest.class);
 
     // SQL Server
     suite.addTestSuite(SQLServerDatatypeExportDelimitedFileManualTest.class);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index 877d7f8..793c23e 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -306,8 +306,6 @@ public abstract class BaseSqoopTestCase extends TestCase {
     PreparedStatement statement = null;
     String createTableStr = null;
     String columnDefStr = "";
-    String columnListStr = "";
-    String valueListStr = "";
 
     try {
       try {
@@ -344,10 +342,13 @@ public abstract class BaseSqoopTestCase extends TestCase {
         }
       }
 
-      if (vals!=null) {
+      for (int count=0; vals != null && count < vals.length/colTypes.length;
+           ++count ) {
+        String columnListStr = "";
+        String valueListStr = "";
         for (int i = 0; i < colTypes.length; i++) {
           columnListStr += colNames[i];
-          valueListStr += vals[i];
+          valueListStr += vals[count * colTypes.length + i];
           if (i < colTypes.length - 1) {
             columnListStr += ", ";
             valueListStr += ", ";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java b/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java
new file mode 100644
index 0000000..3bbb1b1
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.OracleUtils;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test free form query import with the Oracle db.
+ */
+public class OracleIncrementalImportTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      OracleIncrementalImportTest.class.getName());
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return OracleUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    OracleUtils.setOracleAuth(opts);
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    OracleUtils.dropTable(table, getManager());
+  }
+
+  /** the names of the tables we're creating. */
+  private List<String> tableNames;
+
+  @Override
+  public void tearDown() {
+    // Clean up the database on our way out.
+    for (String tableName : tableNames) {
+      try {
+        dropTableIfExists(tableName);
+      } catch (SQLException e) {
+        LOG.warn("Error trying to drop table '" + tableName
+                 + "' on tearDown: " + e);
+      }
+    }
+    super.tearDown();
+  }
+
+  /**
+   * Create the argv to pass to Sqoop.
+   * @param tableName tableName to be used.
+   * @param connPropsFileName connection properties to use
+   * @param checkColumnName name of the column to use for check-column
+   * @return the argv as an array of strings.
+   */
+  protected String [] getArgv(String tableName, String connPropsFileName,
+      String checkColumnName) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    CommonArgs.addHadoopFlags(args);
+
+    args.add("--connect");
+    args.add(getConnectString());
+    args.add("--target-dir");
+    args.add(getWarehouseDir());
+    args.add("--num-mappers");
+    args.add("1");
+    args.add("--table");
+    args.add(tableName);
+    args.add("--incremental");
+    args.add("lastmodified");
+    args.add("--check-column");
+    args.add(checkColumnName);
+    args.add("--last-value");
+    args.add("2000-01-01 01:01:01.0");
+    args.add("--connection-param-file");
+    args.add(connPropsFileName);
+
+    return args.toArray(new String[0]);
+  }
+
+  /**
+   * Create a tables with a date column.  Run incremental import on the table
+   * with date column as check-column.
+   */
+  public void testIncrementalImportWithLastModified() throws IOException {
+    tableNames = new ArrayList<String>();
+    String [] types = { "INT", "VARCHAR(10)", "DATE", };
+    String [] vals = {
+        "1", "'old_data'",
+        "TO_DATE('1999-01-01 11:11:11', 'YYYY-MM-DD HH24:MI:SS')",
+        "2", "'new_data'",
+        "TO_DATE('2000-11-11 23:23:23', 'YYYY-MM-DD HH24:MI:SS')", };
+    String tableName = getTableName();
+    tableNames.add(tableName);
+    createTableWithColTypes(types, vals);
+    // Some version of Oracle's jdbc drivers automatically convert date to
+    // timestamp. Since we don't want this to happen for this test,
+    // we must explicitly use a property file to control this behavior.
+    String connPropsFileName = "connection.properties";
+    createFileWithContent(connPropsFileName,
+        "oracle.jdbc.mapDateToTimestamp=false");
+    String[] args = getArgv(tableName, connPropsFileName, getColName(2));
+    runImport(args);
+
+    Path warehousePath = new Path(this.getWarehouseDir());
+    Path filePath = new Path(warehousePath, "part-m-00000");
+    String output = readLineFromPath(filePath);
+    String expectedVal = "2,new_data,2000-11-11";
+    assertEquals("Incremental import result expected a different string",
+                 expectedVal, output);
+  }
+
+  private void createFileWithContent(String connPropsFileName,
+      String fileContent) throws IOException {
+    File file = new File(connPropsFileName);
+    if(file.exists())
+      file.delete();
+    Writer writer = new BufferedWriter(new FileWriter(connPropsFileName));
+    writer.write(fileContent);
+    writer.close();
+  }
+
+  private String readLineFromPath(Path filePath) throws IOException {
+    BufferedReader reader = null;
+    if (!isOnPhysicalCluster()) {
+      reader = new BufferedReader(new InputStreamReader(new FileInputStream(
+          new File(filePath.toString()))));
+    } else {
+      FileSystem dfs = FileSystem.get(getConf());
+      FSDataInputStream dis = dfs.open(filePath);
+      reader = new BufferedReader(new InputStreamReader(dis));
+    }
+    String line = null;
+    try {
+      line = reader.readLine();
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+    return line;
+  }
+
+}
+