You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/01 20:13:40 UTC

sqoop git commit: SQOOP-2811: Sqoop2: Extracting sequence files may result in duplicates

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 0c20d1f06 -> 118aa7c4f


SQOOP-2811: Sqoop2: Extracting sequence files may result in duplicates

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/118aa7c4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/118aa7c4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/118aa7c4

Branch: refs/heads/sqoop2
Commit: 118aa7c4f9cb7ed3a81ce792e7bf56d31f9107e5
Parents: 0c20d1f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Feb 1 11:13:17 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Feb 1 11:13:17 2016 -0800

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsExtractor.java     |  42 ++-
 .../sqoop/connector/hdfs/HdfsPartition.java     |   6 +
 .../connector/hdfs/SqoopTaskAttemptContext.java | 272 +++++++++++++++++++
 3 files changed, 297 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/118aa7c4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index a813c47..b430739 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
 import org.apache.hadoop.util.LineReader;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
@@ -69,7 +72,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
           LOG.info("Working on partition: " + p);
           int numFiles = p.getNumberOfFiles();
           for (int i = 0; i < numFiles; i++) {
-            extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i));
+            extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i), p.getLocations());
           }
           return null;
         }
@@ -81,7 +84,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
 
   private void extractFile(LinkConfiguration linkConfiguration,
                            FromJobConfiguration fromJobConfiguration,
-                           Path file, long start, long length)
+                           Path file, long start, long length, String[] locations)
       throws IOException {
     long end = start + length;
     LOG.info("Extracting file " + file);
@@ -89,9 +92,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     LOG.info("\t to offset " + end);
     LOG.info("\t of length " + length);
     if(isSequenceFile(file)) {
-      extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length);
+      extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
     } else {
-      extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length);
+      extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
     }
   }
 
@@ -105,29 +108,22 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   @SuppressWarnings("deprecation")
   private void extractSequenceFile(LinkConfiguration linkConfiguration,
                                    FromJobConfiguration fromJobConfiguration,
-                                   Path file, long start, long length)
+                                   Path file, long start, long length, String[] locations)
       throws IOException {
     LOG.info("Extracting sequence file");
-    long end = start + length;
-    SequenceFile.Reader filereader = new SequenceFile.Reader(
-        file.getFileSystem(conf), file, conf);
-
-    if (start > filereader.getPosition()) {
-      filereader.sync(start); // sync to start
-    }
+    SequenceFileRecordReader<Text, NullWritable> sequenceFileRecordReader = new SequenceFileRecordReader();
 
-    Text line = new Text();
-    boolean hasNext = filereader.next(line);
-    while (hasNext) {
-      rowsRead++;
-      extractRow(linkConfiguration, fromJobConfiguration, line);
-      line = new Text();
-      hasNext = filereader.next(line);
-      if (filereader.getPosition() >= end && filereader.syncSeen()) {
-        break;
+    try {
+      sequenceFileRecordReader.initialize(new FileSplit(file, start, length, locations), new SqoopTaskAttemptContext(conf) );
+      while (sequenceFileRecordReader.nextKeyValue()) {
+        rowsRead++;
+        extractRow(linkConfiguration, fromJobConfiguration, sequenceFileRecordReader.getCurrentKey());
       }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      sequenceFileRecordReader.close();
     }
-    filereader.close();
   }
 
   /**
@@ -140,7 +136,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   @SuppressWarnings("resource")
   private void extractTextFile(LinkConfiguration linkConfiguration,
                                FromJobConfiguration fromJobConfiguration,
-                               Path file, long start, long length)
+                               Path file, long start, long length, String[] locations)
       throws IOException {
     LOG.info("Extracting text file");
     long end = start + length;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/118aa7c4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
index 644de60..3af5fa9 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
@@ -72,6 +72,12 @@ public class HdfsPartition extends Partition {
     return lengths[i];
   }
 
+  public String[] getLocations() {
+    String[] locationsCopy = new String[locations.length];
+    System.arraycopy(locations, 0, locationsCopy, 0, locations.length);
+    return locationsCopy;
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     numFiles = in.readInt();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/118aa7c4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java
new file mode 100644
index 0000000..5bec482
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.net.URI;
+
+// Simple TaskAttemptContext wrapper for passing through a conf object
+// for hadoop classes
+public class SqoopTaskAttemptContext implements TaskAttemptContext {
+
+  private Configuration conf;
+
+  public SqoopTaskAttemptContext(Configuration conf){
+    this.conf = conf;
+  }
+
+  @Override
+  public TaskAttemptID getTaskAttemptID() {
+    return null;
+  }
+
+  @Override
+  public void setStatus(String msg) {
+
+  }
+
+  @Override
+  public String getStatus() {
+    return null;
+  }
+
+  @Override
+  public float getProgress() {
+    return 0;
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return null;
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return null;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public Credentials getCredentials() {
+    return null;
+  }
+
+  @Override
+  public JobID getJobID() {
+    return null;
+  }
+
+  @Override
+  public int getNumReduceTasks() {
+    return 0;
+  }
+
+  @Override
+  public Path getWorkingDirectory() throws IOException {
+    return null;
+  }
+
+  @Override
+  public Class<?> getOutputKeyClass() {
+    return null;
+  }
+
+  @Override
+  public Class<?> getOutputValueClass() {
+    return null;
+  }
+
+  @Override
+  public Class<?> getMapOutputKeyClass() {
+    return null;
+  }
+
+  @Override
+  public Class<?> getMapOutputValueClass() {
+    return null;
+  }
+
+  @Override
+  public String getJobName() {
+    return null;
+  }
+
+  @Override
+  public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws
+    ClassNotFoundException {
+    return null;
+  }
+
+  @Override
+  public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws
+    ClassNotFoundException {
+    return null;
+  }
+
+  @Override
+  public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws
+    ClassNotFoundException {
+    return null;
+  }
+
+  @Override
+  public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws
+    ClassNotFoundException {
+    return null;
+  }
+
+  @Override
+  public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+    throws ClassNotFoundException {
+    return null;
+  }
+
+  @Override
+  public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws
+    ClassNotFoundException {
+    return null;
+  }
+
+  @Override
+  public RawComparator<?> getSortComparator() {
+    return null;
+  }
+
+  @Override
+  public String getJar() {
+    return null;
+  }
+
+  @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return null;
+  }
+
+  @Override
+  public RawComparator<?> getGroupingComparator() {
+    return null;
+  }
+
+  @Override
+  public boolean getJobSetupCleanupNeeded() {
+    return false;
+  }
+
+  @Override
+  public boolean getTaskCleanupNeeded() {
+    return false;
+  }
+
+  @Override
+  public boolean getProfileEnabled() {
+    return false;
+  }
+
+  @Override
+  public String getProfileParams() {
+    return null;
+  }
+
+  @Override
+  public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
+    return null;
+  }
+
+  @Override
+  public String getUser() {
+    return null;
+  }
+
+  @Override
+  public boolean getSymlink() {
+    return false;
+  }
+
+  @Override
+  public Path[] getArchiveClassPaths() {
+    return new Path[0];
+  }
+
+  @Override
+  public URI[] getCacheArchives() throws IOException {
+    return new URI[0];
+  }
+
+  @Override
+  public URI[] getCacheFiles() throws IOException {
+    return new URI[0];
+  }
+
+  @Override
+  public Path[] getLocalCacheArchives() throws IOException {
+    return new Path[0];
+  }
+
+  @Override
+  public Path[] getLocalCacheFiles() throws IOException {
+    return new Path[0];
+  }
+
+  @Override
+  public Path[] getFileClassPaths() {
+    return new Path[0];
+  }
+
+  @Override
+  public String[] getArchiveTimestamps() {
+    return new String[0];
+  }
+
+  @Override
+  public String[] getFileTimestamps() {
+    return new String[0];
+  }
+
+  @Override
+  public int getMaxMapAttempts() {
+    return 0;
+  }
+
+  @Override
+  public int getMaxReduceAttempts() {
+    return 0;
+  }
+
+  @Override
+  public void progress() {
+
+  }
+}
\ No newline at end of file