You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2017/08/22 09:01:16 UTC

sqoop git commit: SQOOP-3139: sqoop tries to re execute select query during import in case of a connection reset error and this is causing lots of duplicate records from source

Repository: sqoop
Updated Branches:
  refs/heads/trunk 92e2f9992 -> d2bdef496


SQOOP-3139: sqoop tries to re execute select query during import in case of a connection reset error and this is causing lots of duplicate records from source

(Zoltan Toth via Anna Szonyi)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d2bdef49
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d2bdef49
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d2bdef49

Branch: refs/heads/trunk
Commit: d2bdef49669c400691d35158562dab9adc12a527
Parents: 92e2f99
Author: Anna Szonyi <an...@apache.org>
Authored: Tue Aug 22 10:58:40 2017 +0200
Committer: Anna Szonyi <an...@apache.org>
Committed: Tue Aug 22 10:58:40 2017 +0200

----------------------------------------------------------------------
 .../sqoop/mapreduce/db/DBRecordReader.java      |   4 +
 .../mapreduce/db/SQLServerDBRecordReader.java   |  57 ++++-
 .../db/TestSQLServerDBRecordReader.java         | 214 +++++++++++++++++++
 3 files changed, 264 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
index a78eb06..eed5780 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
@@ -332,4 +332,8 @@ public class DBRecordReader<T extends DBWritable> extends
   protected Configuration getConf(){
     return conf;
   }
+
+  ResultSet getResultSet() {
+    return results;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
index 9a3621b..1dea842 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.mapreduce.db;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,7 +34,7 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
 import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 
-import  org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.lib.SqoopRecord;
 
 /**
  * A RecordReader that reads records from a SQL table.
@@ -41,7 +42,7 @@ import  org.apache.sqoop.lib.SqoopRecord;
  * connection failure handler
  */
 public class SQLServerDBRecordReader<T extends SqoopRecord> extends
-      SqlServerRecordReader<T> {
+    SqlServerRecordReader<T> {
 
   private static final Log LOG =
       LogFactory.getLog(SQLServerDBRecordReader.class);
@@ -55,7 +56,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
   // Name of the split column used to re-generate selectQueries after
   // connection failures
   private String splitColumn;
-  private String lastRecordKey;
+  private String lastRecordValue;
 
   public SQLServerDBRecordReader(DBInputFormat.DBInputSplit split,
       Class<T> inputClass, Configuration conf, Connection conn,
@@ -67,15 +68,48 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
   @Override
   /** {@inheritDoc} */
   public T getCurrentValue() {
-    T val = super.getCurrentValue();
-    // Lookup the key of the last read record to use for recovering
-    // As documented, the map may not be null, though it may be empty.
-    Object lastRecordSplitCol = val.getFieldMap().get(splitColumn);
-    lastRecordKey = (lastRecordSplitCol == null) ? null
-        : lastRecordSplitCol.toString();
+    T val = currentValue();
+
+    saveCurrentValue(val);
+
     return val;
   }
 
+  T currentValue() {
+    return super.getCurrentValue();
+  }
+
+  void saveCurrentValue(T value) {
+    lastRecordValue = getCurrentValueOfSplitByColumnFromORM(value, splitColumn);
+  }
+
+  private String getCurrentValueOfSplitByColumnFromORM(T generatedORMRecord, String columnName) {
+    Object result = generatedORMRecord.getFieldMap().get(columnName);
+    if (result != null) {
+      return result.toString();
+    }
+    return getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(generatedORMRecord, columnName);
+  }
+
+  /*
+   * SQOOP-3139: It is a workaround if the database/table/column is used in case insensitive mode and the user
+   * uses Sqoop import with --split-by but the given parameter doesn't match with table name if it is case sensitive
+   * eg.: tableName.equals(split-by) doesn't match only if tableName.equalsIgnorecase(split-by)
+   *
+   */
+  private String getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(T generatedORMRecord, String columnName) {
+    for (Map.Entry<String, Object> fields : generatedORMRecord.getFieldMap().entrySet()) {
+      if (columnName.equalsIgnoreCase(fields.getKey())) {
+        return fields.getValue() != null ? fields.getValue().toString() : null;
+      }
+    }
+    return null;
+  }
+
+  String getLastRecordValue() {
+    return lastRecordValue;
+  }
+
   /**
    * Load the SQLFailureHandler configured for use by the record reader.
    */
@@ -202,7 +236,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
     // Last seen record key is only expected to be unavailable if no reads
     // ever happened
     String selectQuery;
-    if (lastRecordKey == null) {
+    if (lastRecordValue == null) {
       selectQuery = super.getSelectQuery();
     } else {
       // If last record key is available, construct the select query to start
@@ -212,7 +246,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
       StringBuilder lowerClause = new StringBuilder();
       lowerClause.append(getDBConf().getInputOrderBy());
       lowerClause.append(" > ");
-      lowerClause.append(lastRecordKey.toString());
+      lowerClause.append(lastRecordValue.toString());
 
       // Get the select query with the lowerClause, and split upper clause
       selectQuery = getSelectQuery(lowerClause.toString(),
@@ -221,4 +255,5 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends
 
     return selectQuery;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java b/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java
new file mode 100644
index 0000000..fc04a90
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java
@@ -0,0 +1,214 @@
+package org.apache.sqoop.mapreduce.db;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestSQLServerDBRecordReader {
+
+  private static final String SPLIT_BY_COLUMN = "myCol";
+  private static final String COL_NAME_SAME_AS_SPLIT_BY = SPLIT_BY_COLUMN;
+  private static final String UPPERCASE_COL_NAME = SPLIT_BY_COLUMN.toUpperCase();
+  private static final String ANY_VALUE_FOR_COL = "Value";
+  private static final String NULL_VALUE_FOR_COL = null;
+
+  private SQLServerDBRecordReader reader;
+
+  @Before
+  public void before() throws Exception {
+    DBInputFormat.DBInputSplit split = mock(DBInputFormat.DBInputSplit.class);
+    Configuration conf = new Configuration();
+    conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS, SQLFailureHandlerStub.class.getName());
+    Connection connection = mock(Connection.class);
+    DBConfiguration dbConfiguration = mock(DBConfiguration.class);
+    when(dbConfiguration.getInputOrderBy()).thenReturn(SPLIT_BY_COLUMN);
+
+    reader = spy(new SQLServerDBRecordReader(split, SqlTableClassStub.class, conf, connection, dbConfiguration, "", new String[]{}, "", ""));
+
+
+    doAnswer(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+        return StringUtils.EMPTY;
+      }
+    }).when(reader).getSelectQuery();
+
+    doAnswer(new Answer<ResultSet>() {
+      @Override
+      public ResultSet answer(InvocationOnMock invocationOnMock) throws Throwable {
+        return mock(ResultSet.class);
+      }
+    }).when(reader).executeQuery(anyString());
+
+    reader.initialize(mock(InputSplit.class), mock(TaskAttemptContext.class));
+
+  }
+
+  @Test
+  public void returnNullIfTheLastRecordValueIsNull() {
+    when(reader.currentValue()).thenReturn(new SqlTableClassStub(COL_NAME_SAME_AS_SPLIT_BY, NULL_VALUE_FOR_COL));
+    reader.getCurrentValue();
+    assertEquals(NULL_VALUE_FOR_COL, reader.getLastRecordValue());
+  }
+
+  @Test
+  public void returnNullIfTheLastRecordValueIsNullAndColumnNameIsDifferent() {
+    when(reader.currentValue()).thenReturn(new SqlTableClassStub(UPPERCASE_COL_NAME, NULL_VALUE_FOR_COL));
+    reader.getCurrentValue();
+    assertEquals(NULL_VALUE_FOR_COL, reader.getLastRecordValue());
+  }
+
+  @Test
+  public void returnLastSavedValueWhenColumNameIsTheSameSplitByColumn() {
+    when(reader.currentValue()).thenReturn(new SqlTableClassStub(COL_NAME_SAME_AS_SPLIT_BY, ANY_VALUE_FOR_COL));
+    reader.getCurrentValue();
+
+    assertEquals(ANY_VALUE_FOR_COL, reader.getLastRecordValue());
+  }
+
+  /*
+   * This test intended to test if the table name and query parameter wouldn't
+   * match (eg.: mycol, MyCol) if the DB is case insensitive
+   */
+  @Test
+  public void returnLastSavedValueWhenColumnNameDifferentFromSplitByColumn() {
+    when(reader.currentValue()).thenReturn(new SqlTableClassStub(UPPERCASE_COL_NAME, ANY_VALUE_FOR_COL));
+    reader.getCurrentValue();
+
+    assertEquals(ANY_VALUE_FOR_COL, reader.getLastRecordValue());
+  }
+
+  private static class SqlTableClassStub extends SqoopRecord {
+    private String colName;
+    private String colValue;
+
+    public SqlTableClassStub(String colName, String colValue) {
+      this.colName = colName;
+      this.colValue = colValue;
+    }
+
+    @Override
+    public Map<String, Object> getFieldMap() {
+      return new HashMap<String, Object>() {{
+        put(colName, colValue);
+      }};
+    }
+
+    @Override
+    public void parse(CharSequence s) throws RecordParser.ParseError {
+
+    }
+
+    @Override
+    public void parse(Text s) throws RecordParser.ParseError {
+
+    }
+
+    @Override
+    public void parse(byte[] s) throws RecordParser.ParseError {
+
+    }
+
+    @Override
+    public void parse(char[] s) throws RecordParser.ParseError {
+
+    }
+
+    @Override
+    public void parse(ByteBuffer s) throws RecordParser.ParseError {
+
+    }
+
+    @Override
+    public void parse(CharBuffer s) throws RecordParser.ParseError {
+
+    }
+
+    @Override
+    public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException {
+
+    }
+
+    @Override
+    public int write(PreparedStatement stmt, int offset) throws SQLException {
+      return 0;
+    }
+
+    @Override
+    public String toString(DelimiterSet delimiters) {
+      return null;
+    }
+
+    @Override
+    public int getClassFormatVersion() {
+      return 0;
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+
+    }
+
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+
+    }
+
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+
+    }
+
+
+  }
+
+  private static class SQLFailureHandlerStub extends SQLFailureHandler {
+
+    @Override
+    public boolean canHandleFailure(Throwable failureCause) {
+      return false;
+    }
+
+    @Override
+    public Connection recover() throws IOException {
+      return null;
+    }
+  }
+
+}
\ No newline at end of file