You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/15 19:35:20 UTC
git commit: SQOOP-906: Sqoop is always calling
ConnectionManager.datetimeToQueryString with TIMESTAMP column type
Updated Branches:
refs/heads/trunk ab4bfb9a8 -> 549511378
SQOOP-906: Sqoop is always calling ConnectionManager.datetimeToQueryString with TIMESTAMP column type
(Raghav Kumar Gautam via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/54951137
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/54951137
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/54951137
Branch: refs/heads/trunk
Commit: 5495113781286de00473eb3d8c535f4288454082
Parents: ab4bfb9
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Jul 15 10:34:38 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Jul 15 10:34:38 2013 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/manager/ConnManager.java | 5 +
.../org/apache/sqoop/manager/OracleManager.java | 3 +
src/java/org/apache/sqoop/tool/ImportTool.java | 3 +-
.../cloudera/sqoop/TestIncrementalImport.java | 2 +-
src/test/com/cloudera/sqoop/TestMerge.java | 4 +-
.../com/cloudera/sqoop/ThirdPartyTests.java | 2 +
.../sqoop/testutil/BaseSqoopTestCase.java | 9 +-
.../oracle/OracleIncrementalImportTest.java | 187 +++++++++++++++++++
8 files changed, 207 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index c84c859..f4b22f9 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -721,6 +721,11 @@ public abstract class ConnManager {
* be inserted into a SQL statement, representing that date/time.
*/
public String datetimeToQueryString(String datetime, int columnType) {
+ if (columnType != Types.TIMESTAMP && columnType != Types.DATE) {
+ String msg = "Column type is neither timestamp nor date!";
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
return "'" + datetime + "'";
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/manager/OracleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java
index 686bc19..f6f3afa 100644
--- a/src/java/org/apache/sqoop/manager/OracleManager.java
+++ b/src/java/org/apache/sqoop/manager/OracleManager.java
@@ -593,6 +593,9 @@ public class OracleManager
if (columnType == Types.TIMESTAMP) {
return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
} else if (columnType == Types.DATE) {
+ // converting timestamp of the form 2012-11-11 11:11:11.00 to
+ // date of the form 2011:11:11 11:11:11
+ datetime = datetime.split("\\.")[0];
return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
} else {
String msg = "Column type is neither timestamp nor date!";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index cb800b6..fbbde1d 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -283,7 +283,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
}
break;
case DateLastModified:
- checkColumnType = Types.TIMESTAMP;
+ checkColumnType = manager.getColumnTypes(options.getTableName(),
+ options.getSqlQuery()).get(options.getIncrementalTestColumn());
nextVal = manager.getCurrentDbTimestamp();
if (null == nextVal) {
throw new IOException("Could not get current time from database");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/TestIncrementalImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
index 02080df..8eadcdd 100644
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -424,7 +424,7 @@ public class TestIncrementalImport extends TestCase {
args.add("--incremental");
args.add("lastmodified");
args.add("--check-column");
- args.add("last_modified");
+ args.add("LAST_MODIFIED");
}
args.add("--columns");
args.add("id");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/TestMerge.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
index 5010cf2..cc1a3a9 100644
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ b/src/test/com/cloudera/sqoop/TestMerge.java
@@ -160,7 +160,7 @@ public class TestMerge extends BaseSqoopTestCase {
// Do an import of this data into the "old" dataset.
options.setTargetDir(new Path(warehouse, "merge-old").toString());
options.setIncrementalMode(IncrementalMode.DateLastModified);
- options.setIncrementalTestColumn("lastmod");
+ options.setIncrementalTestColumn("LASTMOD");
ImportTool importTool = new ImportTool();
Sqoop importer = new Sqoop(importTool, options.getConf(), options);
@@ -204,7 +204,7 @@ public class TestMerge extends BaseSqoopTestCase {
options.setNumMappers(1);
options.setTargetDir(new Path(warehouse, "merge-new").toString());
options.setIncrementalMode(IncrementalMode.DateLastModified);
- options.setIncrementalTestColumn("lastmod");
+ options.setIncrementalTestColumn("LASTMOD");
options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
importTool = new ImportTool();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/ThirdPartyTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
index ada5c72..b2db1b4 100644
--- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java
+++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
@@ -44,6 +44,7 @@ import com.cloudera.sqoop.manager.PostgresqlImportTest;
import org.apache.sqoop.manager.mysql.MySqlCallExportTest;
import org.apache.sqoop.manager.oracle.OracleCallExportTest;
+import org.apache.sqoop.manager.oracle.OracleIncrementalImportTest;
import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportDelimitedFileManualTest;
import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportSequenceFileManualTest;
import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeImportDelimitedFileManualTest;
@@ -82,6 +83,7 @@ public final class ThirdPartyTests extends TestCase {
suite.addTestSuite(OracleExportTest.class);
suite.addTestSuite(OracleManagerTest.class);
suite.addTestSuite(OracleCompatTest.class);
+ suite.addTestSuite(OracleIncrementalImportTest.class);
// SQL Server
suite.addTestSuite(SQLServerDatatypeExportDelimitedFileManualTest.class);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index 877d7f8..793c23e 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -306,8 +306,6 @@ public abstract class BaseSqoopTestCase extends TestCase {
PreparedStatement statement = null;
String createTableStr = null;
String columnDefStr = "";
- String columnListStr = "";
- String valueListStr = "";
try {
try {
@@ -344,10 +342,13 @@ public abstract class BaseSqoopTestCase extends TestCase {
}
}
- if (vals!=null) {
+ for (int count=0; vals != null && count < vals.length/colTypes.length;
+ ++count ) {
+ String columnListStr = "";
+ String valueListStr = "";
for (int i = 0; i < colTypes.length; i++) {
columnListStr += colNames[i];
- valueListStr += vals[i];
+ valueListStr += vals[count * colTypes.length + i];
if (i < colTypes.length - 1) {
columnListStr += ", ";
valueListStr += ", ";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java b/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java
new file mode 100644
index 0000000..3bbb1b1
--- /dev/null
+++ b/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.OracleUtils;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test free form query import with the Oracle db.
+ */
+public class OracleIncrementalImportTest extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory.getLog(
+ OracleIncrementalImportTest.class.getName());
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return OracleUtils.CONNECT_STRING;
+ }
+
+ @Override
+ protected SqoopOptions getSqoopOptions(Configuration conf) {
+ SqoopOptions opts = new SqoopOptions(conf);
+ OracleUtils.setOracleAuth(opts);
+ return opts;
+ }
+
+ @Override
+ protected void dropTableIfExists(String table) throws SQLException {
+ OracleUtils.dropTable(table, getManager());
+ }
+
+ /** the names of the tables we're creating. */
+ private List<String> tableNames;
+
+ @Override
+ public void tearDown() {
+ // Clean up the database on our way out.
+ for (String tableName : tableNames) {
+ try {
+ dropTableIfExists(tableName);
+ } catch (SQLException e) {
+ LOG.warn("Error trying to drop table '" + tableName
+ + "' on tearDown: " + e);
+ }
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Create the argv to pass to Sqoop.
+ * @param tableName tableName to be used.
+ * @param connPropsFileName connection properties to use
+ * @param checkColumnName name of the column to use for check-column
+ * @return the argv as an array of strings.
+ */
+ protected String [] getArgv(String tableName, String connPropsFileName,
+ String checkColumnName) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("--target-dir");
+ args.add(getWarehouseDir());
+ args.add("--num-mappers");
+ args.add("1");
+ args.add("--table");
+ args.add(tableName);
+ args.add("--incremental");
+ args.add("lastmodified");
+ args.add("--check-column");
+ args.add(checkColumnName);
+ args.add("--last-value");
+ args.add("2000-01-01 01:01:01.0");
+ args.add("--connection-param-file");
+ args.add(connPropsFileName);
+
+ return args.toArray(new String[0]);
+ }
+
+ /**
+ * Create a tables with a date column. Run incremental import on the table
+ * with date column as check-column.
+ */
+ public void testIncrementalImportWithLastModified() throws IOException {
+ tableNames = new ArrayList<String>();
+ String [] types = { "INT", "VARCHAR(10)", "DATE", };
+ String [] vals = {
+ "1", "'old_data'",
+ "TO_DATE('1999-01-01 11:11:11', 'YYYY-MM-DD HH24:MI:SS')",
+ "2", "'new_data'",
+ "TO_DATE('2000-11-11 23:23:23', 'YYYY-MM-DD HH24:MI:SS')", };
+ String tableName = getTableName();
+ tableNames.add(tableName);
+ createTableWithColTypes(types, vals);
+ // Some version of Oracle's jdbc drivers automatically convert date to
+ // timestamp. Since we don't want this to happen for this test,
+ // we must explicitly use a property file to control this behavior.
+ String connPropsFileName = "connection.properties";
+ createFileWithContent(connPropsFileName,
+ "oracle.jdbc.mapDateToTimestamp=false");
+ String[] args = getArgv(tableName, connPropsFileName, getColName(2));
+ runImport(args);
+
+ Path warehousePath = new Path(this.getWarehouseDir());
+ Path filePath = new Path(warehousePath, "part-m-00000");
+ String output = readLineFromPath(filePath);
+ String expectedVal = "2,new_data,2000-11-11";
+ assertEquals("Incremental import result expected a different string",
+ expectedVal, output);
+ }
+
+ private void createFileWithContent(String connPropsFileName,
+ String fileContent) throws IOException {
+ File file = new File(connPropsFileName);
+ if(file.exists())
+ file.delete();
+ Writer writer = new BufferedWriter(new FileWriter(connPropsFileName));
+ writer.write(fileContent);
+ writer.close();
+ }
+
+ private String readLineFromPath(Path filePath) throws IOException {
+ BufferedReader reader = null;
+ if (!isOnPhysicalCluster()) {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(
+ new File(filePath.toString()))));
+ } else {
+ FileSystem dfs = FileSystem.get(getConf());
+ FSDataInputStream dis = dfs.open(filePath);
+ reader = new BufferedReader(new InputStreamReader(dis));
+ }
+ String line = null;
+ try {
+ line = reader.readLine();
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ return line;
+ }
+
+}
+