You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2017/08/22 09:01:16 UTC
sqoop git commit: SQOOP-3139: sqoop tries to re execute select query
during import in case of a connection reset error and this is causing lots of
duplicate records from source
Repository: sqoop
Updated Branches:
refs/heads/trunk 92e2f9992 -> d2bdef496
SQOOP-3139: sqoop tries to re execute select query during import in case of a connection reset error and this is causing lots of duplicate records from source
(Zoltan Toth via Anna Szonyi)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d2bdef49
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d2bdef49
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d2bdef49
Branch: refs/heads/trunk
Commit: d2bdef49669c400691d35158562dab9adc12a527
Parents: 92e2f99
Author: Anna Szonyi <an...@apache.org>
Authored: Tue Aug 22 10:58:40 2017 +0200
Committer: Anna Szonyi <an...@apache.org>
Committed: Tue Aug 22 10:58:40 2017 +0200
----------------------------------------------------------------------
.../sqoop/mapreduce/db/DBRecordReader.java | 4 +
.../mapreduce/db/SQLServerDBRecordReader.java | 57 ++++-
.../db/TestSQLServerDBRecordReader.java | 214 +++++++++++++++++++
3 files changed, 264 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
index a78eb06..eed5780 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
@@ -332,4 +332,8 @@ public class DBRecordReader<T extends DBWritable> extends
protected Configuration getConf(){
return conf;
}
+
+ ResultSet getResultSet() {
+ return results;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
index 9a3621b..1dea842 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.mapreduce.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,7 +34,7 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.lib.SqoopRecord;
/**
* A RecordReader that reads records from a SQL table.
@@ -41,7 +42,7 @@ import org.apache.sqoop.lib.SqoopRecord;
* connection failure handler
*/
public class SQLServerDBRecordReader<T extends SqoopRecord> extends
- SqlServerRecordReader<T> {
+ SqlServerRecordReader<T> {
private static final Log LOG =
LogFactory.getLog(SQLServerDBRecordReader.class);
@@ -55,7 +56,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
// Name of the split column used to re-generate selectQueries after
// connection failures
private String splitColumn;
- private String lastRecordKey;
+ private String lastRecordValue;
public SQLServerDBRecordReader(DBInputFormat.DBInputSplit split,
Class<T> inputClass, Configuration conf, Connection conn,
@@ -67,15 +68,48 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
@Override
/** {@inheritDoc} */
public T getCurrentValue() {
- T val = super.getCurrentValue();
- // Lookup the key of the last read record to use for recovering
- // As documented, the map may not be null, though it may be empty.
- Object lastRecordSplitCol = val.getFieldMap().get(splitColumn);
- lastRecordKey = (lastRecordSplitCol == null) ? null
- : lastRecordSplitCol.toString();
+ T val = currentValue();
+
+ saveCurrentValue(val);
+
return val;
}
+ T currentValue() {
+ return super.getCurrentValue();
+ }
+
+ void saveCurrentValue(T value) {
+ lastRecordValue = getCurrentValueOfSplitByColumnFromORM(value, splitColumn);
+ }
+
+ private String getCurrentValueOfSplitByColumnFromORM(T generatedORMRecord, String columnName) {
+ Object result = generatedORMRecord.getFieldMap().get(columnName);
+ if (result != null) {
+ return result.toString();
+ }
+ return getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(generatedORMRecord, columnName);
+ }
+
+ /*
+ * SQOOP-3139: It is a workaround if the database/table/column is used in case insensitive mode and the user
+ * uses Sqoop import with --split-by but the given parameter doesn't match with table name if it is case sensitive
+ * eg.: tableName.equals(split-by) doesn't match only if tableName.equalsIgnorecase(split-by)
+ *
+ */
+ private String getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(T generatedORMRecord, String columnName) {
+ for (Map.Entry<String, Object> fields : generatedORMRecord.getFieldMap().entrySet()) {
+ if (columnName.equalsIgnoreCase(fields.getKey())) {
+ return fields.getValue() != null ? fields.getValue().toString() : null;
+ }
+ }
+ return null;
+ }
+
+ String getLastRecordValue() {
+ return lastRecordValue;
+ }
+
/**
* Load the SQLFailureHandler configured for use by the record reader.
*/
@@ -202,7 +236,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
// Last seen record key is only expected to be unavailable if no reads
// ever happened
String selectQuery;
- if (lastRecordKey == null) {
+ if (lastRecordValue == null) {
selectQuery = super.getSelectQuery();
} else {
// If last record key is available, construct the select query to start
@@ -212,7 +246,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
StringBuilder lowerClause = new StringBuilder();
lowerClause.append(getDBConf().getInputOrderBy());
lowerClause.append(" > ");
- lowerClause.append(lastRecordKey.toString());
+ lowerClause.append(lastRecordValue.toString());
// Get the select query with the lowerClause, and split upper clause
selectQuery = getSelectQuery(lowerClause.toString(),
@@ -221,4 +255,5 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
return selectQuery;
}
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java b/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java
new file mode 100644
index 0000000..fc04a90
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java
@@ -0,0 +1,214 @@
+package org.apache.sqoop.mapreduce.db;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestSQLServerDBRecordReader {
+
+ private static final String SPLIT_BY_COLUMN = "myCol";
+ private static final String COL_NAME_SAME_AS_SPLIT_BY = SPLIT_BY_COLUMN;
+ private static final String UPPERCASE_COL_NAME = SPLIT_BY_COLUMN.toUpperCase();
+ private static final String ANY_VALUE_FOR_COL = "Value";
+ private static final String NULL_VALUE_FOR_COL = null;
+
+ private SQLServerDBRecordReader reader;
+
+ @Before
+ public void before() throws Exception {
+ DBInputFormat.DBInputSplit split = mock(DBInputFormat.DBInputSplit.class);
+ Configuration conf = new Configuration();
+ conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS, SQLFailureHandlerStub.class.getName());
+ Connection connection = mock(Connection.class);
+ DBConfiguration dbConfiguration = mock(DBConfiguration.class);
+ when(dbConfiguration.getInputOrderBy()).thenReturn(SPLIT_BY_COLUMN);
+
+ reader = spy(new SQLServerDBRecordReader(split, SqlTableClassStub.class, conf, connection, dbConfiguration, "", new String[]{}, "", ""));
+
+
+ doAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return StringUtils.EMPTY;
+ }
+ }).when(reader).getSelectQuery();
+
+ doAnswer(new Answer<ResultSet>() {
+ @Override
+ public ResultSet answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return mock(ResultSet.class);
+ }
+ }).when(reader).executeQuery(anyString());
+
+ reader.initialize(mock(InputSplit.class), mock(TaskAttemptContext.class));
+
+ }
+
+ @Test
+ public void returnNullIfTheLastRecordValueIsNull() {
+ when(reader.currentValue()).thenReturn(new SqlTableClassStub(COL_NAME_SAME_AS_SPLIT_BY, NULL_VALUE_FOR_COL));
+ reader.getCurrentValue();
+ assertEquals(NULL_VALUE_FOR_COL, reader.getLastRecordValue());
+ }
+
+ @Test
+ public void returnNullIfTheLastRecordValueIsNullAndColumnNameIsDifferent() {
+ when(reader.currentValue()).thenReturn(new SqlTableClassStub(UPPERCASE_COL_NAME, NULL_VALUE_FOR_COL));
+ reader.getCurrentValue();
+ assertEquals(NULL_VALUE_FOR_COL, reader.getLastRecordValue());
+ }
+
+ @Test
+ public void returnLastSavedValueWhenColumNameIsTheSameSplitByColumn() {
+ when(reader.currentValue()).thenReturn(new SqlTableClassStub(COL_NAME_SAME_AS_SPLIT_BY, ANY_VALUE_FOR_COL));
+ reader.getCurrentValue();
+
+ assertEquals(ANY_VALUE_FOR_COL, reader.getLastRecordValue());
+ }
+
+ /*
+ * This test intended to test if the table name and query parameter wouldn't
+ * match (eg.: mycol, MyCol) if the DB is case insensitive
+ */
+ @Test
+ public void returnLastSavedValueWhenColumnNameDifferentFromSplitByColumn() {
+ when(reader.currentValue()).thenReturn(new SqlTableClassStub(UPPERCASE_COL_NAME, ANY_VALUE_FOR_COL));
+ reader.getCurrentValue();
+
+ assertEquals(ANY_VALUE_FOR_COL, reader.getLastRecordValue());
+ }
+
+ private static class SqlTableClassStub extends SqoopRecord {
+ private String colName;
+ private String colValue;
+
+ public SqlTableClassStub(String colName, String colValue) {
+ this.colName = colName;
+ this.colValue = colValue;
+ }
+
+ @Override
+ public Map<String, Object> getFieldMap() {
+ return new HashMap<String, Object>() {{
+ put(colName, colValue);
+ }};
+ }
+
+ @Override
+ public void parse(CharSequence s) throws RecordParser.ParseError {
+
+ }
+
+ @Override
+ public void parse(Text s) throws RecordParser.ParseError {
+
+ }
+
+ @Override
+ public void parse(byte[] s) throws RecordParser.ParseError {
+
+ }
+
+ @Override
+ public void parse(char[] s) throws RecordParser.ParseError {
+
+ }
+
+ @Override
+ public void parse(ByteBuffer s) throws RecordParser.ParseError {
+
+ }
+
+ @Override
+ public void parse(CharBuffer s) throws RecordParser.ParseError {
+
+ }
+
+ @Override
+ public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException {
+
+ }
+
+ @Override
+ public int write(PreparedStatement stmt, int offset) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public String toString(DelimiterSet delimiters) {
+ return null;
+ }
+
+ @Override
+ public int getClassFormatVersion() {
+ return 0;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+
+ }
+
+
+ }
+
+ private static class SQLFailureHandlerStub extends SQLFailureHandler {
+
+ @Override
+ public boolean canHandleFailure(Throwable failureCause) {
+ return false;
+ }
+
+ @Override
+ public Connection recover() throws IOException {
+ return null;
+ }
+ }
+
+}
\ No newline at end of file