You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/10/16 00:09:00 UTC
git commit: SQOOP-533: Intermediate data format support for import
Updated Branches:
refs/heads/sqoop2 b61de724e -> 27aa78679
SQOOP-533: Intermediate data format support for import
(Bilung Lee via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/27aa7867
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/27aa7867
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/27aa7867
Branch: refs/heads/sqoop2
Commit: 27aa786793887623af655774c7fc4022590e0967
Parents: b61de72
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Oct 15 15:07:38 2012 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Oct 15 15:07:38 2012 -0700
----------------------------------------------------------------------
.../sqoop/connector/jdbc/TestImportExtractor.java | 7 +-
.../sqoop/job/etl/HdfsSequenceImportLoader.java | 16 +-
.../apache/sqoop/job/etl/HdfsTextImportLoader.java | 10 +-
.../main/java/org/apache/sqoop/job/io/Data.java | 157 ++++++++++-----
.../java/org/apache/sqoop/job/mr/SqoopMapper.java | 19 ++-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 18 +-
.../test/java/org/apache/sqoop/io/TestData.java | 76 +++++++
.../test/java/org/apache/sqoop/job/FileUtils.java | 29 +++
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 48 ++---
.../java/org/apache/sqoop/job/TestMapReduce.java | 34 ++--
.../java/org/apache/sqoop/job/io/DataReader.java | 4 +-
.../java/org/apache/sqoop/job/io/DataWriter.java | 4 +-
12 files changed, 302 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 519286b..70e29e5 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -144,6 +144,11 @@ public class TestImportExtractor extends TestCase {
int indx = START;
@Override
+ public void setFieldDelimiter(char fieldDelimiter) {
+ // do nothing and use default delimiter
+ }
+
+ @Override
public void writeArrayRecord(Object[] array) {
for (int i = 0; i < array.length; i++) {
if (array[i] instanceof Integer) {
@@ -163,7 +168,7 @@ public class TestImportExtractor extends TestCase {
}
@Override
- public void writeRecord(Object record) {
+ public void writeContent(Object content, int type) {
fail("This method should not be invoked.");
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index 8802cbc..854d325 100644
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -37,18 +37,18 @@ import org.apache.sqoop.utils.ClassLoadingUtils;
public class HdfsSequenceImportLoader extends Loader {
- public static final String extension = ".seq";
+ public static final String EXTENSION = ".seq";
private final char fieldDelimiter;
- private final char recordDelimiter;
public HdfsSequenceImportLoader() {
fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
}
@Override
public void run(Context context, DataReader reader) {
+ reader.setFieldDelimiter(fieldDelimiter);
+
Configuration conf = ((EtlContext)context).getConfiguration();
String filename =
context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
@@ -71,12 +71,12 @@ public class HdfsSequenceImportLoader extends Loader {
}
}
- filename += extension;
+ filename += EXTENSION;
try {
Path filepath = new Path(filename);
SequenceFile.Writer filewriter;
- if (codecname != null) {
+ if (codec != null) {
filewriter = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(filepath),
SequenceFile.Writer.keyClass(Text.class),
@@ -90,10 +90,10 @@ public class HdfsSequenceImportLoader extends Loader {
SequenceFile.Writer.compression(CompressionType.NONE));
}
- Object record;
+ String csv;
Text text = new Text();
- while ((record = reader.readRecord()) != null) {
- text.set(Data.format(record, fieldDelimiter, recordDelimiter));
+ while ((csv = reader.readCsvRecord()) != null) {
+ text.set(csv);
filewriter.append(text, NullWritable.get());
}
filewriter.close();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index b1ee255..240265b 100644
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -47,6 +47,8 @@ public class HdfsTextImportLoader extends Loader {
@Override
public void run(Context context, DataReader reader) {
+ reader.setFieldDelimiter(fieldDelimiter);
+
Configuration conf = ((EtlContext)context).getConfiguration();
String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
@@ -76,7 +78,7 @@ public class HdfsTextImportLoader extends Loader {
BufferedWriter filewriter;
DataOutputStream filestream = fs.create(filepath, false);
- if (codecname != null) {
+ if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()),
Data.CHARSET_NAME));
@@ -85,9 +87,9 @@ public class HdfsTextImportLoader extends Loader {
filestream, Data.CHARSET_NAME));
}
- Object record;
- while ((record = reader.readRecord()) != null) {
- filewriter.write(Data.format(record, fieldDelimiter, recordDelimiter));
+ String csv;
+ while ((csv = reader.readCsvRecord()) != null) {
+ filewriter.write(csv + recordDelimiter);
}
filewriter.close();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/core/src/main/java/org/apache/sqoop/job/io/Data.java
index 2732e83..4ddd132 100644
--- a/core/src/main/java/org/apache/sqoop/job/io/Data.java
+++ b/core/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.regex.Matcher;
@@ -43,26 +44,45 @@ public class Data implements WritableComparable<Data> {
public static final int ARRAY_RECORD = 2;
private int type = EMPTY_DATA;
- public static final char DEFAULT_FIELD_DELIMITER = ',';
- public static final char DEFAULT_RECORD_DELIMITER = '\n';
public static final String CHARSET_NAME = "UTF-8";
- public void setContent(Object content) {
- if (content == null) {
- this.type = EMPTY_DATA;
- } else if (content instanceof String) {
- this.type = CSV_RECORD;
- } else if (content instanceof Object[]) {
- this.type = ARRAY_RECORD;
- } else {
- throw new SqoopException(CoreError.CORE_0012,
- content.getClass().getName());
+ public static final char DEFAULT_RECORD_DELIMITER = '\n';
+ public static final char DEFAULT_FIELD_DELIMITER = ',';
+ public static final char DEFAULT_STRING_DELIMITER = '\'';
+ public static final char DEFAULT_STRING_ESCAPE = '\\';
+ private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+ private char stringDelimiter = DEFAULT_STRING_DELIMITER;
+ private char stringEscape = DEFAULT_STRING_ESCAPE;
+ private String escapedStringDelimiter = String.valueOf(new char[] {
+ stringEscape, stringDelimiter
+ });
+
+ public void setFieldDelimiter(char fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ }
+
+ public void setContent(Object content, int type) {
+ switch (type) {
+ case EMPTY_DATA:
+ case CSV_RECORD:
+ case ARRAY_RECORD:
+ this.type = type;
+ this.content = content;
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
}
- this.content = content;
}
- public Object getContent() {
- return content;
+ public Object getContent(int targetType) {
+ switch (targetType) {
+ case CSV_RECORD:
+ return format();
+ case ARRAY_RECORD:
+ return parse();
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType));
+ }
}
public int getType() {
@@ -73,39 +93,9 @@ public class Data implements WritableComparable<Data> {
return (type == EMPTY_DATA);
}
- public static String format(Object content,
- char fieldDelimiter, char recordDelimiter) {
- if (content instanceof String) {
- return (String)content + recordDelimiter;
-
- } else if (content instanceof Object[]) {
- StringBuilder sb = new StringBuilder();
- Object[] array = (Object[])content;
- for (int i = 0; i < array.length; i++) {
- if (i != 0) {
- sb.append(fieldDelimiter);
- }
-
- if (array[i] instanceof String) {
- // TODO: Also need to escape those special characters as documented in:
- // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
- sb.append("\'");
- sb.append(((String)array[i]).replaceAll(
- "\'", Matcher.quoteReplacement("\\\'")));
- sb.append("\'");
- } else if (array[i] instanceof byte[]) {
- sb.append(Arrays.toString((byte[])array[i]));
- } else {
- sb.append(array[i].toString());
- }
- }
- sb.append(recordDelimiter);
- return sb.toString();
-
- } else {
- throw new SqoopException(CoreError.CORE_0012,
- content.getClass().getName());
- }
+ @Override
+ public String toString() {
+ return (String)getContent(CSV_RECORD);
}
@Override
@@ -150,11 +140,6 @@ public class Data implements WritableComparable<Data> {
}
@Override
- public String toString() {
- return format(content, DEFAULT_FIELD_DELIMITER, DEFAULT_RECORD_DELIMITER);
- }
-
- @Override
public void readFields(DataInput in) throws IOException {
type = readType(in);
switch (type) {
@@ -324,4 +309,70 @@ public class Data implements WritableComparable<Data> {
}
}
+ private String format() {
+ switch (type) {
+ case EMPTY_DATA:
+ return null;
+
+ case CSV_RECORD:
+ if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
+ return (String)content;
+ } else {
+ // TODO: need to exclude the case where comma is part of a string.
+ return ((String)content).replaceAll(
+ String.valueOf(DEFAULT_FIELD_DELIMITER),
+ String.valueOf(fieldDelimiter));
+ }
+
+ case ARRAY_RECORD:
+ StringBuilder sb = new StringBuilder();
+ Object[] array = (Object[])content;
+ for (int i = 0; i < array.length; i++) {
+ if (i != 0) {
+ sb.append(fieldDelimiter);
+ }
+
+ if (array[i] instanceof String) {
+ sb.append(stringDelimiter);
+ sb.append(escape((String)array[i]));
+ sb.append(stringDelimiter);
+ } else if (array[i] instanceof byte[]) {
+ sb.append(Arrays.toString((byte[])array[i]));
+ } else {
+ sb.append(String.valueOf(array[i]));
+ }
+ }
+ return sb.toString();
+
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+ }
+ }
+
+ private Object[] parse() {
+ switch (type) {
+ case EMPTY_DATA:
+ return null;
+
+ case CSV_RECORD:
+ ArrayList<Object> list = new ArrayList<Object>();
+ // todo: need to parse CSV into Array
+ return list.toArray();
+
+ case ARRAY_RECORD:
+ return (Object[])content;
+
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+ }
+ }
+
+ private String escape(String string) {
+ // TODO: Also need to escape those special characters as documented in:
+ // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+ String regex = String.valueOf(stringDelimiter);
+ String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
+ return string.replaceAll(regex, replacement);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index eb02271..0a9f46d 100644
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -79,22 +79,31 @@ public class SqoopMapper
}
@Override
- public void writeArrayRecord(Object[] record) {
- writeRecord(record);
+ public void setFieldDelimiter(char fieldDelimiter) {
+ if (data == null) {
+ data = new Data();
+ }
+
+ data.setFieldDelimiter(fieldDelimiter);
+ }
+
+ @Override
+ public void writeArrayRecord(Object[] array) {
+ writeContent(array, Data.ARRAY_RECORD);
}
@Override
public void writeCsvRecord(String csv) {
- writeRecord(csv);
+ writeContent(csv, Data.CSV_RECORD);
}
@Override
- public void writeRecord(Object record) {
+ public void writeContent(Object content, int type) {
if (data == null) {
data = new Data();
}
- data.setContent(record);
+ data.setContent(content, type);
try {
context.write(data, NullWritable.get());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 71e76ca..23fcb62 100644
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -74,7 +74,8 @@ public class SqoopOutputFormatLoadExecutor {
data.wait();
}
- data.setContent(key.getContent());
+ int type = key.getType();
+ data.setContent(key.getContent(type), type);
// notify reader that the data is ready
data.notify();
@@ -126,17 +127,22 @@ public class SqoopOutputFormatLoadExecutor {
public class OutputFormatDataReader extends DataReader {
@Override
+ public void setFieldDelimiter(char fieldDelimiter) {
+ data.setFieldDelimiter(fieldDelimiter);
+ }
+
+ @Override
public Object[] readArrayRecord() {
- return (Object[])readRecord();
+ return (Object[])readContent(Data.ARRAY_RECORD);
}
@Override
public String readCsvRecord() {
- return (String)readRecord();
+ return (String)readContent(Data.CSV_RECORD);
}
@Override
- public Object readRecord() {
+ public Object readContent(int type) {
synchronized (data) {
if (writerFinished) {
return null;
@@ -148,8 +154,8 @@ public class SqoopOutputFormatLoadExecutor {
data.wait();
}
- Object content = data.getContent();
- data.setContent(null);
+ Object content = data.getContent(type);
+ data.setContent(null, Data.EMPTY_DATA);
// notify writer that data is consumed
data.notify();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/io/TestData.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/io/TestData.java b/core/src/test/java/org/apache/sqoop/io/TestData.java
new file mode 100644
index 0000000..9fe9d41
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/io/TestData.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.job.io.Data;
+import org.junit.Test;
+
+public class TestData extends TestCase {
+
+ private static final double TEST_NUMBER = Math.PI + 100;
+ @Test
+ public void testArrayToCsv() throws Exception {
+ Data data = new Data();
+ String expected;
+ String actual;
+
+ // with special characters:
+ expected =
+ Long.valueOf((long)TEST_NUMBER) + "," +
+ Double.valueOf(TEST_NUMBER) + "," +
+ "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+ Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+ data.setContent(new Object[] {
+ Long.valueOf((long)TEST_NUMBER),
+ Double.valueOf(TEST_NUMBER),
+ String.valueOf(TEST_NUMBER) + "',s",
+ new byte[] {1, 2, 3, 4, 5} },
+ Data.ARRAY_RECORD);
+ actual = (String)data.getContent(Data.CSV_RECORD);
+ assertEquals(expected, actual);
+
+ // with null characters:
+ expected =
+ Long.valueOf((long)TEST_NUMBER) + "," +
+ Double.valueOf(TEST_NUMBER) + "," +
+ "null" + "," +
+ Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+ data.setContent(new Object[] {
+ Long.valueOf((long)TEST_NUMBER),
+ Double.valueOf(TEST_NUMBER),
+ null,
+ new byte[] {1, 2, 3, 4, 5} },
+ Data.ARRAY_RECORD);
+ actual = (String)data.getContent(Data.CSV_RECORD);
+ assertEquals(expected, actual);
+ }
+
+ public static void assertEquals(Object expected, Object actual) {
+ if (expected instanceof byte[]) {
+ assertEquals(Arrays.toString((byte[])expected),
+ Arrays.toString((byte[])actual));
+ } else {
+ TestCase.assertEquals(expected, actual);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
index 4b075d2..e685883 100644
--- a/core/src/test/java/org/apache/sqoop/job/FileUtils.java
+++ b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
@@ -18,6 +18,8 @@
package org.apache.sqoop.job;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -25,6 +27,12 @@ import org.apache.hadoop.fs.Path;
public class FileUtils {
+ public static boolean exists(String file) throws IOException {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ return fs.exists(path);
+ }
+
public static void delete(String file) throws IOException {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(new Configuration());
@@ -33,6 +41,27 @@ public class FileUtils {
}
}
+ public static void mkdirs(String directory) throws IOException {
+ Path path = new Path(directory);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ if (!fs.exists(path)) {
+ fs.mkdirs(path);
+ }
+ }
+
+ public static InputStream open(String fileName)
+ throws IOException, ClassNotFoundException {
+ Path filepath = new Path(fileName);
+ FileSystem fs = filepath.getFileSystem(new Configuration());
+ return fs.open(filepath);
+ }
+
+ public static OutputStream create(String fileName) throws IOException {
+ Path filepath = new Path(fileName);
+ FileSystem fs = filepath.getFileSystem(new Configuration());
+ return fs.create(filepath, false);
+ }
+
private FileUtils() {
// Disable explicit object creation
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index ab05c8e..64c767c 100644
--- a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.job;
import java.io.BufferedReader;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -30,7 +29,6 @@ import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -73,9 +71,8 @@ public class TestHdfsLoad extends TestCase {
conf.set(FileOutputFormat.OUTDIR, outdir);
JobUtils.runJob(conf);
- Path filepath = new Path(outdir, OUTPUT_FILE);
- FileSystem fs = filepath.getFileSystem(conf);
- DataInputStream filestream = new DataInputStream(fs.open(filepath));
+ String fileName = outdir + "/" + OUTPUT_FILE;
+ InputStream filestream = FileUtils.open(fileName);
BufferedReader filereader = new BufferedReader(new InputStreamReader(
filestream, Data.CHARSET_NAME));
verifyOutputText(filereader);
@@ -97,27 +94,26 @@ public class TestHdfsLoad extends TestCase {
FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
.asSubclass(CompressionCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
- Path filepath = new Path(outdir,
- OUTPUT_FILE + codec.getDefaultExtension());
- FileSystem fs = filepath.getFileSystem(conf);
- InputStream filestream = codec.createInputStream(fs.open(filepath));
+ String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension();
+ InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
BufferedReader filereader = new BufferedReader(new InputStreamReader(
filestream, Data.CHARSET_NAME));
verifyOutputText(filereader);
}
private void verifyOutputText(BufferedReader reader) throws IOException {
- String line = null;
- int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ String actual = null;
String expected;
- while ((line = reader.readLine()) != null){
- expected = Data.format(
- new Object[] {String.valueOf(index), new Integer(index), new Double(index)},
- Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER);
+ Data data = new Data();
+ int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ while ((actual = reader.readLine()) != null){
+ data.setContent(new Object[] {
+ new Integer(index), new Double(index), String.valueOf(index) },
+ Data.ARRAY_RECORD);
+ expected = data.toString();
index++;
- assertEquals(expected.toString(),
- line + Data.DEFAULT_RECORD_DELIMITER);
+ assertEquals(expected, actual);
}
reader.close();
@@ -137,7 +133,7 @@ public class TestHdfsLoad extends TestCase {
JobUtils.runJob(conf);
Path filepath = new Path(outdir,
- OUTPUT_FILE + HdfsSequenceImportLoader.extension);
+ OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(filepath));
verifyOutputSequence(filereader);
@@ -156,7 +152,7 @@ public class TestHdfsLoad extends TestCase {
JobUtils.runJob(conf);
Path filepath = new Path(outdir,
- OUTPUT_FILE + HdfsSequenceImportLoader.extension);
+ OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(filepath));
verifyOutputSequence(filereader);
@@ -164,12 +160,14 @@ public class TestHdfsLoad extends TestCase {
private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
int index = START_ID*NUMBER_OF_ROWS_PER_ID;
- Text expected = new Text();
Text actual = new Text();
+ Text expected = new Text();
+ Data data = new Data();
while (reader.next(actual)){
- expected.set(Data.format(
- new Object[] {String.valueOf(index), new Integer(index), new Double(index)},
- Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER));
+ data.setContent(new Object[] {
+ new Integer(index), new Double(index), String.valueOf(index) },
+ Data.ARRAY_RECORD);
+ expected.set(data.toString());
index++;
assertEquals(expected.toString(), actual.toString());
@@ -221,9 +219,9 @@ public class TestHdfsLoad extends TestCase {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
Object[] array = new Object[] {
- String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
- new Double(id*NUMBER_OF_ROWS_PER_ID+row)
+ new Double(id*NUMBER_OF_ROWS_PER_ID+row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
};
writer.writeArrayRecord(array);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index f4701db..7646f57 100644
--- a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -50,9 +50,9 @@ import org.junit.Test;
public class TestMapReduce extends TestCase {
- private static final int START_ID = 1;
- private static final int NUMBER_OF_IDS = 9;
- private static final int NUMBER_OF_ROWS_PER_ID = 10;
+ private static final int START_PARTITION = 1;
+ private static final int NUMBER_OF_PARTITIONS = 9;
+ private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
@Test
public void testInputFormat() throws Exception {
@@ -64,7 +64,7 @@ public class TestMapReduce extends TestCase {
List<InputSplit> splits = inputformat.getSplits(job);
assertEquals(9, splits.size());
- for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
SqoopSplit split = (SqoopSplit)splits.get(id-1);
DummyPartition partition = (DummyPartition)split.getPartition();
assertEquals(id, partition.getId());
@@ -118,7 +118,7 @@ public class TestMapReduce extends TestCase {
@Override
public List<Partition> run(Context context) {
List<Partition> partitions = new LinkedList<Partition>();
- for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
DummyPartition partition = new DummyPartition();
partition.setId(id);
partitions.add(partition);
@@ -131,11 +131,11 @@ public class TestMapReduce extends TestCase {
@Override
public void run(Context context, Partition partition, DataWriter writer) {
int id = ((DummyPartition)partition).getId();
- for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
writer.writeArrayRecord(new Object[] {
- String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
- new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
- new Double(id*NUMBER_OF_ROWS_PER_ID+row)});
+ new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
}
}
}
@@ -160,15 +160,16 @@ public class TestMapReduce extends TestCase {
public static class DummyRecordWriter
extends RecordWriter<Data, NullWritable> {
- private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data data = new Data();
@Override
public void write(Data key, NullWritable value) {
data.setContent(new Object[] {
- String.valueOf(index),
new Integer(index),
- new Double(index)});
+ new Double(index),
+ String.valueOf(index)},
+ Data.ARRAY_RECORD);
index++;
assertEquals(data.toString(), key.toString());
@@ -201,7 +202,7 @@ public class TestMapReduce extends TestCase {
}
public static class DummyLoader extends Loader {
- private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data expected = new Data();
private Data actual = new Data();
@@ -209,12 +210,13 @@ public class TestMapReduce extends TestCase {
public void run(Context context, DataReader reader) {
Object[] array;
while ((array = reader.readArrayRecord()) != null) {
- actual.setContent(array);
+ actual.setContent(array, Data.ARRAY_RECORD);
expected.setContent(new Object[] {
- String.valueOf(index),
new Integer(index),
- new Double(index)});
+ new Double(index),
+ String.valueOf(index)},
+ Data.ARRAY_RECORD);
index++;
assertEquals(expected.toString(), actual.toString());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
index b9b2f49..18e2fb7 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
@@ -27,6 +27,8 @@ public abstract class DataReader {
public abstract String readCsvRecord();
- public abstract Object readRecord();
+ public abstract Object readContent(int type);
+
+ public abstract void setFieldDelimiter(char fieldDelimiter);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
index 29c4283..30a0c7c 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
@@ -27,6 +27,8 @@ public abstract class DataWriter {
public abstract void writeCsvRecord(String csv);
- public abstract void writeRecord(Object record);
+ public abstract void writeContent(Object content, int type);
+
+ public abstract void setFieldDelimiter(char fieldDelimiter);
}