You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/10/16 00:09:00 UTC

git commit: SQOOP-533: Intermediate data format support for import

Updated Branches:
  refs/heads/sqoop2 b61de724e -> 27aa78679


SQOOP-533: Intermediate data format support for import

(Bilung Lee via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/27aa7867
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/27aa7867
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/27aa7867

Branch: refs/heads/sqoop2
Commit: 27aa786793887623af655774c7fc4022590e0967
Parents: b61de72
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Oct 15 15:07:38 2012 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Oct 15 15:07:38 2012 -0700

----------------------------------------------------------------------
 .../sqoop/connector/jdbc/TestImportExtractor.java  |    7 +-
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |   16 +-
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |   10 +-
 .../main/java/org/apache/sqoop/job/io/Data.java    |  157 ++++++++++-----
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |   19 ++-
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |   18 +-
 .../test/java/org/apache/sqoop/io/TestData.java    |   76 +++++++
 .../test/java/org/apache/sqoop/job/FileUtils.java  |   29 +++
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |   48 ++---
 .../java/org/apache/sqoop/job/TestMapReduce.java   |   34 ++--
 .../java/org/apache/sqoop/job/io/DataReader.java   |    4 +-
 .../java/org/apache/sqoop/job/io/DataWriter.java   |    4 +-
 12 files changed, 302 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 519286b..70e29e5 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -144,6 +144,11 @@ public class TestImportExtractor extends TestCase {
     int indx = START;
 
     @Override
+    public void setFieldDelimiter(char fieldDelimiter) {
+      // do nothing and use default delimiter
+    }
+
+    @Override
     public void writeArrayRecord(Object[] array) {
       for (int i = 0; i < array.length; i++) {
         if (array[i] instanceof Integer) {
@@ -163,7 +168,7 @@ public class TestImportExtractor extends TestCase {
     }
 
     @Override
-    public void writeRecord(Object record) {
+    public void writeContent(Object content, int type) {
       fail("This method should not be invoked.");
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index 8802cbc..854d325 100644
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -37,18 +37,18 @@ import org.apache.sqoop.utils.ClassLoadingUtils;
 
 public class HdfsSequenceImportLoader extends Loader {
 
-  public static final String extension = ".seq";
+  public static final String EXTENSION = ".seq";
 
   private final char fieldDelimiter;
-  private final char recordDelimiter;
 
   public HdfsSequenceImportLoader() {
     fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
   }
 
   @Override
   public void run(Context context, DataReader reader) {
+    reader.setFieldDelimiter(fieldDelimiter);
+
     Configuration conf = ((EtlContext)context).getConfiguration();
     String filename =
         context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
@@ -71,12 +71,12 @@ public class HdfsSequenceImportLoader extends Loader {
       }
     }
 
-    filename += extension;
+    filename += EXTENSION;
 
     try {
       Path filepath = new Path(filename);
       SequenceFile.Writer filewriter;
-      if (codecname != null) {
+      if (codec != null) {
         filewriter = SequenceFile.createWriter(conf,
             SequenceFile.Writer.file(filepath),
             SequenceFile.Writer.keyClass(Text.class),
@@ -90,10 +90,10 @@ public class HdfsSequenceImportLoader extends Loader {
           SequenceFile.Writer.compression(CompressionType.NONE));
       }
 
-      Object record;
+      String csv;
       Text text = new Text();
-      while ((record = reader.readRecord()) != null) {
-        text.set(Data.format(record, fieldDelimiter, recordDelimiter));
+      while ((csv = reader.readCsvRecord()) != null) {
+        text.set(csv);
         filewriter.append(text, NullWritable.get());
       }
       filewriter.close();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index b1ee255..240265b 100644
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -47,6 +47,8 @@ public class HdfsTextImportLoader extends Loader {
 
   @Override
   public void run(Context context, DataReader reader) {
+    reader.setFieldDelimiter(fieldDelimiter);
+
     Configuration conf = ((EtlContext)context).getConfiguration();
     String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
     String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
@@ -76,7 +78,7 @@ public class HdfsTextImportLoader extends Loader {
 
       BufferedWriter filewriter;
       DataOutputStream filestream = fs.create(filepath, false);
-      if (codecname != null) {
+      if (codec != null) {
         filewriter = new BufferedWriter(new OutputStreamWriter(
             codec.createOutputStream(filestream, codec.createCompressor()),
             Data.CHARSET_NAME));
@@ -85,9 +87,9 @@ public class HdfsTextImportLoader extends Loader {
             filestream, Data.CHARSET_NAME));
       }
 
-      Object record;
-      while ((record = reader.readRecord()) != null) {
-        filewriter.write(Data.format(record, fieldDelimiter, recordDelimiter));
+      String csv;
+      while ((csv = reader.readCsvRecord()) != null) {
+        filewriter.write(csv + recordDelimiter);
       }
       filewriter.close();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/core/src/main/java/org/apache/sqoop/job/io/Data.java
index 2732e83..4ddd132 100644
--- a/core/src/main/java/org/apache/sqoop/job/io/Data.java
+++ b/core/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.regex.Matcher;
 
@@ -43,26 +44,45 @@ public class Data implements WritableComparable<Data> {
   public static final int ARRAY_RECORD = 2;
   private int type = EMPTY_DATA;
 
-  public static final char DEFAULT_FIELD_DELIMITER = ',';
-  public static final char DEFAULT_RECORD_DELIMITER = '\n';
   public static final String CHARSET_NAME = "UTF-8";
 
-  public void setContent(Object content) {
-    if (content == null) {
-      this.type = EMPTY_DATA;
-    } else if (content instanceof String) {
-      this.type = CSV_RECORD;
-    } else if (content instanceof Object[]) {
-      this.type = ARRAY_RECORD;
-    } else {
-      throw new SqoopException(CoreError.CORE_0012,
-          content.getClass().getName());
+  public static final char DEFAULT_RECORD_DELIMITER = '\n';
+  public static final char DEFAULT_FIELD_DELIMITER = ',';
+  public static final char DEFAULT_STRING_DELIMITER = '\'';
+  public static final char DEFAULT_STRING_ESCAPE = '\\';
+  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
+  private char stringEscape = DEFAULT_STRING_ESCAPE;
+  private String escapedStringDelimiter = String.valueOf(new char[] {
+      stringEscape, stringDelimiter
+  });
+
+  public void setFieldDelimiter(char fieldDelimiter) {
+    this.fieldDelimiter = fieldDelimiter;
+  }
+
+  public void setContent(Object content, int type) {
+    switch (type) {
+    case EMPTY_DATA:
+    case CSV_RECORD:
+    case ARRAY_RECORD:
+      this.type = type;
+      this.content = content;
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
     }
-    this.content = content;
   }
 
-  public Object getContent() {
-    return content;
+  public Object getContent(int targetType) {
+    switch (targetType) {
+    case CSV_RECORD:
+      return format();
+    case ARRAY_RECORD:
+      return parse();
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType));
+    }
   }
 
   public int getType() {
@@ -73,39 +93,9 @@ public class Data implements WritableComparable<Data> {
     return (type == EMPTY_DATA);
   }
 
-  public static String format(Object content,
-      char fieldDelimiter, char recordDelimiter) {
-    if (content instanceof String) {
-      return (String)content + recordDelimiter;
-
-    } else if (content instanceof Object[]) {
-      StringBuilder sb = new StringBuilder();
-      Object[] array = (Object[])content;
-      for (int i = 0; i < array.length; i++) {
-        if (i != 0) {
-          sb.append(fieldDelimiter);
-        }
-
-        if (array[i] instanceof String) {
-          // TODO: Also need to escape those special characters as documented in:
-          // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
-          sb.append("\'");
-          sb.append(((String)array[i]).replaceAll(
-              "\'", Matcher.quoteReplacement("\\\'")));
-          sb.append("\'");
-        } else if (array[i] instanceof byte[]) {
-          sb.append(Arrays.toString((byte[])array[i]));
-        } else {
-          sb.append(array[i].toString());
-        }
-      }
-      sb.append(recordDelimiter);
-      return sb.toString();
-
-    } else {
-      throw new SqoopException(CoreError.CORE_0012,
-          content.getClass().getName());
-    }
+  @Override
+  public String toString() {
+    return (String)getContent(CSV_RECORD);
   }
 
   @Override
@@ -150,11 +140,6 @@ public class Data implements WritableComparable<Data> {
   }
 
   @Override
-  public String toString() {
-    return format(content, DEFAULT_FIELD_DELIMITER, DEFAULT_RECORD_DELIMITER);
-  }
-
-  @Override
   public void readFields(DataInput in) throws IOException {
     type = readType(in);
     switch (type) {
@@ -324,4 +309,70 @@ public class Data implements WritableComparable<Data> {
     }
   }
 
+  private String format() {
+    switch (type) {
+    case EMPTY_DATA:
+      return null;
+
+    case CSV_RECORD:
+      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
+        return (String)content;
+      } else {
+        // TODO: need to exclude the case where comma is part of a string.
+        return ((String)content).replaceAll(
+            String.valueOf(DEFAULT_FIELD_DELIMITER),
+            String.valueOf(fieldDelimiter));
+      }
+
+    case ARRAY_RECORD:
+      StringBuilder sb = new StringBuilder();
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        if (i != 0) {
+          sb.append(fieldDelimiter);
+        }
+
+        if (array[i] instanceof String) {
+          sb.append(stringDelimiter);
+          sb.append(escape((String)array[i]));
+          sb.append(stringDelimiter);
+        } else if (array[i] instanceof byte[]) {
+          sb.append(Arrays.toString((byte[])array[i]));
+        } else {
+          sb.append(String.valueOf(array[i]));
+        }
+      }
+      return sb.toString();
+
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private Object[] parse() {
+    switch (type) {
+    case EMPTY_DATA:
+      return null;
+
+    case CSV_RECORD:
+      ArrayList<Object> list = new ArrayList<Object>();
+      // todo: need to parse CSV into Array
+      return list.toArray();
+
+    case ARRAY_RECORD:
+      return (Object[])content;
+
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private String escape(String string) {
+    // TODO: Also need to escape those special characters as documented in:
+    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+    String regex = String.valueOf(stringDelimiter);
+    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
+    return string.replaceAll(regex, replacement);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index eb02271..0a9f46d 100644
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -79,22 +79,31 @@ public class SqoopMapper
     }
 
     @Override
-    public void writeArrayRecord(Object[] record) {
-      writeRecord(record);
+    public void setFieldDelimiter(char fieldDelimiter) {
+      if (data == null) {
+        data = new Data();
+      }
+
+      data.setFieldDelimiter(fieldDelimiter);
+    }
+
+    @Override
+    public void writeArrayRecord(Object[] array) {
+      writeContent(array, Data.ARRAY_RECORD);
     }
 
     @Override
     public void writeCsvRecord(String csv) {
-      writeRecord(csv);
+      writeContent(csv, Data.CSV_RECORD);
     }
 
     @Override
-    public void writeRecord(Object record) {
+    public void writeContent(Object content, int type) {
       if (data == null) {
         data = new Data();
       }
 
-      data.setContent(record);
+      data.setContent(content, type);
       try {
         context.write(data, NullWritable.get());
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 71e76ca..23fcb62 100644
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -74,7 +74,8 @@ public class SqoopOutputFormatLoadExecutor {
             data.wait();
           }
 
-          data.setContent(key.getContent());
+          int type = key.getType();
+          data.setContent(key.getContent(type), type);
 
           // notify reader that the data is ready
           data.notify();
@@ -126,17 +127,22 @@ public class SqoopOutputFormatLoadExecutor {
 
   public class OutputFormatDataReader extends DataReader {
     @Override
+    public void setFieldDelimiter(char fieldDelimiter) {
+      data.setFieldDelimiter(fieldDelimiter);
+    }
+
+    @Override
     public Object[] readArrayRecord() {
-      return (Object[])readRecord();
+      return (Object[])readContent(Data.ARRAY_RECORD);
     }
 
     @Override
     public String readCsvRecord() {
-      return (String)readRecord();
+      return (String)readContent(Data.CSV_RECORD);
     }
 
     @Override
-    public Object readRecord() {
+    public Object readContent(int type) {
       synchronized (data) {
         if (writerFinished) {
           return null;
@@ -148,8 +154,8 @@ public class SqoopOutputFormatLoadExecutor {
             data.wait();
           }
 
-          Object content = data.getContent();
-          data.setContent(null);
+          Object content = data.getContent(type);
+          data.setContent(null, Data.EMPTY_DATA);
 
           // notify writer that data is consumed
           data.notify();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/io/TestData.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/io/TestData.java b/core/src/test/java/org/apache/sqoop/io/TestData.java
new file mode 100644
index 0000000..9fe9d41
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/io/TestData.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.job.io.Data;
+import org.junit.Test;
+
+public class TestData extends TestCase {
+
+  private static final double TEST_NUMBER = Math.PI + 100;
+  @Test
+  public void testArrayToCsv() throws Exception {
+    Data data = new Data();
+    String expected;
+    String actual;
+
+    // with special characters:
+    expected =
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+    data.setContent(new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        String.valueOf(TEST_NUMBER) + "',s",
+        new byte[] {1, 2, 3, 4, 5} },
+        Data.ARRAY_RECORD);
+    actual = (String)data.getContent(Data.CSV_RECORD);
+    assertEquals(expected, actual);
+
+    // with null characters:
+    expected =
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "null" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+    data.setContent(new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        null,
+        new byte[] {1, 2, 3, 4, 5} },
+        Data.ARRAY_RECORD);
+    actual = (String)data.getContent(Data.CSV_RECORD);
+    assertEquals(expected, actual);
+  }
+
+  public static void assertEquals(Object expected, Object actual) {
+    if (expected instanceof byte[]) {
+      assertEquals(Arrays.toString((byte[])expected),
+          Arrays.toString((byte[])actual));
+    } else {
+      TestCase.assertEquals(expected, actual);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
index 4b075d2..e685883 100644
--- a/core/src/test/java/org/apache/sqoop/job/FileUtils.java
+++ b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
@@ -18,6 +18,8 @@
 package org.apache.sqoop.job;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,6 +27,12 @@ import org.apache.hadoop.fs.Path;
 
 public class FileUtils {
 
+  public static boolean exists(String file) throws IOException {
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    return fs.exists(path);
+  }
+
   public static void delete(String file) throws IOException {
     Path path = new Path(file);
     FileSystem fs = path.getFileSystem(new Configuration());
@@ -33,6 +41,27 @@ public class FileUtils {
     }
   }
 
+  public static void mkdirs(String directory) throws IOException {
+    Path path = new Path(directory);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    if (!fs.exists(path)) {
+      fs.mkdirs(path);
+    }
+  }
+
+  public static InputStream open(String fileName)
+    throws IOException, ClassNotFoundException {
+    Path filepath = new Path(fileName);
+    FileSystem fs = filepath.getFileSystem(new Configuration());
+    return fs.open(filepath);
+  }
+
+  public static OutputStream create(String fileName) throws IOException {
+    Path filepath = new Path(fileName);
+    FileSystem fs = filepath.getFileSystem(new Configuration());
+    return fs.create(filepath, false);
+  }
+
   private FileUtils() {
     // Disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index ab05c8e..64c767c 100644
--- a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.job;
 
 import java.io.BufferedReader;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +29,6 @@ import java.util.List;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -73,9 +71,8 @@ public class TestHdfsLoad extends TestCase {
     conf.set(FileOutputFormat.OUTDIR, outdir);
     JobUtils.runJob(conf);
 
-    Path filepath = new Path(outdir, OUTPUT_FILE);
-    FileSystem fs = filepath.getFileSystem(conf);
-    DataInputStream filestream = new DataInputStream(fs.open(filepath));
+    String fileName = outdir + "/" +  OUTPUT_FILE;
+    InputStream filestream = FileUtils.open(fileName);
     BufferedReader filereader = new BufferedReader(new InputStreamReader(
         filestream, Data.CHARSET_NAME));
     verifyOutputText(filereader);
@@ -97,27 +94,26 @@ public class TestHdfsLoad extends TestCase {
         FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
         .asSubclass(CompressionCodec.class);
     CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
-    Path filepath = new Path(outdir,
-        OUTPUT_FILE + codec.getDefaultExtension());
-    FileSystem fs = filepath.getFileSystem(conf);
-    InputStream filestream = codec.createInputStream(fs.open(filepath));
+    String fileName = outdir + "/" +  OUTPUT_FILE + codec.getDefaultExtension();
+    InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
     BufferedReader filereader = new BufferedReader(new InputStreamReader(
         filestream, Data.CHARSET_NAME));
     verifyOutputText(filereader);
   }
 
   private void verifyOutputText(BufferedReader reader) throws IOException {
-    String line = null;
-    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    String actual = null;
     String expected;
-    while ((line = reader.readLine()) != null){
-      expected = Data.format(
-          new Object[] {String.valueOf(index), new Integer(index), new Double(index)},
-          Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER);
+    Data data = new Data();
+    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    while ((actual = reader.readLine()) != null){
+      data.setContent(new Object[] {
+          new Integer(index), new Double(index), String.valueOf(index) },
+          Data.ARRAY_RECORD);
+      expected = data.toString();
       index++;
 
-      assertEquals(expected.toString(),
-          line + Data.DEFAULT_RECORD_DELIMITER);
+      assertEquals(expected, actual);
     }
     reader.close();
 
@@ -137,7 +133,7 @@ public class TestHdfsLoad extends TestCase {
     JobUtils.runJob(conf);
 
     Path filepath = new Path(outdir,
-        OUTPUT_FILE + HdfsSequenceImportLoader.extension);
+        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
     SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
         SequenceFile.Reader.file(filepath));
     verifyOutputSequence(filereader);
@@ -156,7 +152,7 @@ public class TestHdfsLoad extends TestCase {
     JobUtils.runJob(conf);
 
     Path filepath = new Path(outdir,
-        OUTPUT_FILE + HdfsSequenceImportLoader.extension);
+        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
     SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
         SequenceFile.Reader.file(filepath));
     verifyOutputSequence(filereader);
@@ -164,12 +160,14 @@ public class TestHdfsLoad extends TestCase {
 
   private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
     int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-    Text expected = new Text();
     Text actual = new Text();
+    Text expected = new Text();
+    Data data = new Data();
     while (reader.next(actual)){
-      expected.set(Data.format(
-          new Object[] {String.valueOf(index), new Integer(index), new Double(index)},
-          Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER));
+      data.setContent(new Object[] {
+          new Integer(index), new Double(index), String.valueOf(index) },
+          Data.ARRAY_RECORD);
+      expected.set(data.toString());
       index++;
 
       assertEquals(expected.toString(), actual.toString());
@@ -221,9 +219,9 @@ public class TestHdfsLoad extends TestCase {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
         Object[] array = new Object[] {
-          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
           new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
-          new Double(id*NUMBER_OF_ROWS_PER_ID+row)
+          new Double(id*NUMBER_OF_ROWS_PER_ID+row),
+          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
         };
         writer.writeArrayRecord(array);
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index f4701db..7646f57 100644
--- a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -50,9 +50,9 @@ import org.junit.Test;
 
 public class TestMapReduce extends TestCase {
 
-  private static final int START_ID = 1;
-  private static final int NUMBER_OF_IDS = 9;
-  private static final int NUMBER_OF_ROWS_PER_ID = 10;
+  private static final int START_PARTITION = 1;
+  private static final int NUMBER_OF_PARTITIONS = 9;
+  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
 
   @Test
   public void testInputFormat() throws Exception {
@@ -64,7 +64,7 @@ public class TestMapReduce extends TestCase {
     List<InputSplit> splits = inputformat.getSplits(job);
     assertEquals(9, splits.size());
 
-    for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+    for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
       SqoopSplit split = (SqoopSplit)splits.get(id-1);
       DummyPartition partition = (DummyPartition)split.getPartition();
       assertEquals(id, partition.getId());
@@ -118,7 +118,7 @@ public class TestMapReduce extends TestCase {
     @Override
     public List<Partition> run(Context context) {
       List<Partition> partitions = new LinkedList<Partition>();
-      for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
         DummyPartition partition = new DummyPartition();
         partition.setId(id);
         partitions.add(partition);
@@ -131,11 +131,11 @@ public class TestMapReduce extends TestCase {
     @Override
     public void run(Context context, Partition partition, DataWriter writer) {
       int id = ((DummyPartition)partition).getId();
-      for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
         writer.writeArrayRecord(new Object[] {
-            String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
-            new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
-            new Double(id*NUMBER_OF_ROWS_PER_ID+row)});
+            new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
       }
     }
   }
@@ -160,15 +160,16 @@ public class TestMapReduce extends TestCase {
 
     public static class DummyRecordWriter
         extends RecordWriter<Data, NullWritable> {
-      private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
       private Data data = new Data();
 
       @Override
       public void write(Data key, NullWritable value) {
         data.setContent(new Object[] {
-          String.valueOf(index),
           new Integer(index),
-          new Double(index)});
+          new Double(index),
+          String.valueOf(index)},
+          Data.ARRAY_RECORD);
         index++;
 
         assertEquals(data.toString(), key.toString());
@@ -201,7 +202,7 @@ public class TestMapReduce extends TestCase {
   }
 
   public static class DummyLoader extends Loader {
-    private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
     private Data expected = new Data();
     private Data actual = new Data();
 
@@ -209,12 +210,13 @@ public class TestMapReduce extends TestCase {
     public void run(Context context, DataReader reader) {
       Object[] array;
       while ((array = reader.readArrayRecord()) != null) {
-        actual.setContent(array);
+        actual.setContent(array, Data.ARRAY_RECORD);
 
         expected.setContent(new Object[] {
-          String.valueOf(index),
           new Integer(index),
-          new Double(index)});
+          new Double(index),
+          String.valueOf(index)},
+          Data.ARRAY_RECORD);
         index++;
 
         assertEquals(expected.toString(), actual.toString());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
index b9b2f49..18e2fb7 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
@@ -27,6 +27,8 @@ public abstract class DataReader {
 
   public abstract String readCsvRecord();
 
-  public abstract Object readRecord();
+  public abstract Object readContent(int type);
+
+  public abstract void setFieldDelimiter(char fieldDelimiter);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
index 29c4283..30a0c7c 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
@@ -27,6 +27,8 @@ public abstract class DataWriter {
 
   public abstract void writeCsvRecord(String csv);
 
-  public abstract void writeRecord(Object record);
+  public abstract void writeContent(Object content, int type);
+
+  public abstract void setFieldDelimiter(char fieldDelimiter);
 
 }