You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/01 20:13:40 UTC
sqoop git commit: SQOOP-2811: Sqoop2: Extracting sequence files may
result in duplicates
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 0c20d1f06 -> 118aa7c4f
SQOOP-2811: Sqoop2: Extracting sequence files may result in duplicates
(Abraham Fine via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/118aa7c4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/118aa7c4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/118aa7c4
Branch: refs/heads/sqoop2
Commit: 118aa7c4f9cb7ed3a81ce792e7bf56d31f9107e5
Parents: 0c20d1f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Feb 1 11:13:17 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Feb 1 11:13:17 2016 -0800
----------------------------------------------------------------------
.../sqoop/connector/hdfs/HdfsExtractor.java | 42 ++-
.../sqoop/connector/hdfs/HdfsPartition.java | 6 +
.../connector/hdfs/SqoopTaskAttemptContext.java | 272 +++++++++++++++++++
3 files changed, 297 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/118aa7c4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index a813c47..b430739 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
@@ -69,7 +72,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
LOG.info("Working on partition: " + p);
int numFiles = p.getNumberOfFiles();
for (int i = 0; i < numFiles; i++) {
- extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i));
+ extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i), p.getLocations());
}
return null;
}
@@ -81,7 +84,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
private void extractFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
- Path file, long start, long length)
+ Path file, long start, long length, String[] locations)
throws IOException {
long end = start + length;
LOG.info("Extracting file " + file);
@@ -89,9 +92,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
LOG.info("\t to offset " + end);
LOG.info("\t of length " + length);
if(isSequenceFile(file)) {
- extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length);
+ extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
} else {
- extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length);
+ extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
}
}
@@ -105,29 +108,22 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
@SuppressWarnings("deprecation")
private void extractSequenceFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
- Path file, long start, long length)
+ Path file, long start, long length, String[] locations)
throws IOException {
LOG.info("Extracting sequence file");
- long end = start + length;
- SequenceFile.Reader filereader = new SequenceFile.Reader(
- file.getFileSystem(conf), file, conf);
-
- if (start > filereader.getPosition()) {
- filereader.sync(start); // sync to start
- }
+ SequenceFileRecordReader<Text, NullWritable> sequenceFileRecordReader = new SequenceFileRecordReader();
- Text line = new Text();
- boolean hasNext = filereader.next(line);
- while (hasNext) {
- rowsRead++;
- extractRow(linkConfiguration, fromJobConfiguration, line);
- line = new Text();
- hasNext = filereader.next(line);
- if (filereader.getPosition() >= end && filereader.syncSeen()) {
- break;
+ try {
+ sequenceFileRecordReader.initialize(new FileSplit(file, start, length, locations), new SqoopTaskAttemptContext(conf) );
+ while (sequenceFileRecordReader.nextKeyValue()) {
+ rowsRead++;
+ extractRow(linkConfiguration, fromJobConfiguration, sequenceFileRecordReader.getCurrentKey());
}
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ sequenceFileRecordReader.close();
}
- filereader.close();
}
/**
@@ -140,7 +136,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
@SuppressWarnings("resource")
private void extractTextFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
- Path file, long start, long length)
+ Path file, long start, long length, String[] locations)
throws IOException {
LOG.info("Extracting text file");
long end = start + length;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/118aa7c4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
index 644de60..3af5fa9 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
@@ -72,6 +72,12 @@ public class HdfsPartition extends Partition {
return lengths[i];
}
+ public String[] getLocations() {
+ String[] locationsCopy = new String[locations.length];
+ System.arraycopy(locations, 0, locationsCopy, 0, locations.length);
+ return locationsCopy;
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
numFiles = in.readInt();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/118aa7c4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java
new file mode 100644
index 0000000..5bec482
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/SqoopTaskAttemptContext.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.net.URI;
+
+// Simple TaskAttemptContext wrapper for passing through a conf object
+// for hadoop classes
+public class SqoopTaskAttemptContext implements TaskAttemptContext {
+
+ private Configuration conf;
+
+ public SqoopTaskAttemptContext(Configuration conf){
+ this.conf = conf;
+ }
+
+ @Override
+ public TaskAttemptID getTaskAttemptID() {
+ return null;
+ }
+
+ @Override
+ public void setStatus(String msg) {
+
+ }
+
+ @Override
+ public String getStatus() {
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ return null;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return null;
+ }
+
+ @Override
+ public int getNumReduceTasks() {
+ return 0;
+ }
+
+ @Override
+ public Path getWorkingDirectory() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Class<?> getOutputKeyClass() {
+ return null;
+ }
+
+ @Override
+ public Class<?> getOutputValueClass() {
+ return null;
+ }
+
+ @Override
+ public Class<?> getMapOutputKeyClass() {
+ return null;
+ }
+
+ @Override
+ public Class<?> getMapOutputValueClass() {
+ return null;
+ }
+
+ @Override
+ public String getJobName() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws
+ ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws
+ ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws
+ ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws
+ ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws
+ ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getSortComparator() {
+ return null;
+ }
+
+ @Override
+ public String getJar() {
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getGroupingComparator() {
+ return null;
+ }
+
+ @Override
+ public boolean getJobSetupCleanupNeeded() {
+ return false;
+ }
+
+ @Override
+ public boolean getTaskCleanupNeeded() {
+ return false;
+ }
+
+ @Override
+ public boolean getProfileEnabled() {
+ return false;
+ }
+
+ @Override
+ public String getProfileParams() {
+ return null;
+ }
+
+ @Override
+ public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
+ return null;
+ }
+
+ @Override
+ public String getUser() {
+ return null;
+ }
+
+ @Override
+ public boolean getSymlink() {
+ return false;
+ }
+
+ @Override
+ public Path[] getArchiveClassPaths() {
+ return new Path[0];
+ }
+
+ @Override
+ public URI[] getCacheArchives() throws IOException {
+ return new URI[0];
+ }
+
+ @Override
+ public URI[] getCacheFiles() throws IOException {
+ return new URI[0];
+ }
+
+ @Override
+ public Path[] getLocalCacheArchives() throws IOException {
+ return new Path[0];
+ }
+
+ @Override
+ public Path[] getLocalCacheFiles() throws IOException {
+ return new Path[0];
+ }
+
+ @Override
+ public Path[] getFileClassPaths() {
+ return new Path[0];
+ }
+
+ @Override
+ public String[] getArchiveTimestamps() {
+ return new String[0];
+ }
+
+ @Override
+ public String[] getFileTimestamps() {
+ return new String[0];
+ }
+
+ @Override
+ public int getMaxMapAttempts() {
+ return 0;
+ }
+
+ @Override
+ public int getMaxReduceAttempts() {
+ return 0;
+ }
+
+ @Override
+ public void progress() {
+
+ }
+}
\ No newline at end of file