You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/12/02 00:24:28 UTC

sqoop git commit: SQOOP-1750: Support Map Type in CSV IDF

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 34d7066c3 -> d68c05d3a


SQOOP-1750: Support Map Type in CSV IDF

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d68c05d3
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d68c05d3
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d68c05d3

Branch: refs/heads/sqoop2
Commit: d68c05d3ad0c99dc3dee2d07b34f919b79dab09d
Parents: 34d7066
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Dec 1 15:24:02 2014 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Dec 1 15:24:02 2014 -0800

----------------------------------------------------------------------
 .../idf/CSVIntermediateDataFormat.java          |  79 ++++++++-
 .../idf/TestCSVIntermediateDataFormat.java      | 166 ++++++++++++++++++-
 2 files changed, 236 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/d68c05d3/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 4f2baf9..bdab7a4 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.schema.type.FloatingPoint;
 import org.joda.time.LocalDate;
 import org.joda.time.LocalDateTime;
 import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 
 import java.io.DataInput;
@@ -39,8 +40,11 @@ import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.regex.Matcher;
 
 
@@ -85,6 +89,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
   private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
   private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>();
+  private final List<Integer> mapTypeColumnIndices = new ArrayList<Integer>();
 
   private Schema schema;
 
@@ -129,6 +134,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
         byteTypeColumnIndices.add(i);
       } else if (isColumnListType(col)) {
         listTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.MAP) {
+        mapTypeColumnIndices.add(i);
       }
       i++;
     }
@@ -147,7 +154,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
     boolean quoted = false;
     boolean escaped = false;
-    boolean insideJSON = false;
 
     List<String> parsedData = new LinkedList<String>();
     StringBuilder builder = new StringBuilder();
@@ -167,7 +173,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
         escaped = !escaped;
         break;
       case SEPARATOR_CHARACTER:
-        if (quoted || insideJSON) {
+        if (quoted) {
           builder.append(c);
         } else {
           parsedData.add(builder.toString());
@@ -217,12 +223,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
         objectArray[i] = null;
         continue;
       }
-      objectArray[i] = parseStringArrayElement(fieldStringArray[i], columnArray[i]);
+      objectArray[i] = parseCSVStringArrayElement(fieldStringArray[i], columnArray[i]);
     }
     return objectArray;
   }
 
-  private Object parseStringArrayElement(String fieldString, Column column) {
+  private Object parseCSVStringArrayElement(String fieldString, Column column) {
     Object returnValue = null;
 
     switch (column.getType()) {
@@ -271,6 +277,9 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     case SET:
       returnValue = parseListElementFromJSON(fieldString);
       break;
+    case MAP:
+      returnValue = parseMapElementFromJSON(fieldString);
+      break;
     default:
       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
           "Column type from schema was not recognized for " + column.getType());
@@ -287,11 +296,60 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
     }
     if (array != null) {
-     return array.toArray();
+      return array.toArray();
+    }
+    return null;
+  }
+
+  private Map<Object, Object> parseMapElementFromJSON(String fieldString) {
+
+    JSONObject object = null;
+    try {
+      object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString));
+    } catch (org.json.simple.parser.ParseException e) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
+    }
+    if (object != null) {
+      return toMap(object);
     }
     return null;
   }
 
+  private List<Object> toList(JSONArray array) {
+    List<Object> list = new ArrayList<Object>();
+    for (int i = 0; i < array.size(); i++) {
+      Object value = array.get(i);
+      if (value instanceof JSONArray) {
+        value = toList((JSONArray) value);
+      }
+
+      else if (value instanceof JSONObject) {
+        value = toMap((JSONObject) value);
+      }
+      list.add(value);
+    }
+    return list;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<Object, Object> toMap(JSONObject object) {
+    Map<Object, Object> elementMap = new HashMap<Object, Object>();
+    Set<Map.Entry<Object, Object>> entries = object.entrySet();
+    for (Map.Entry<Object, Object> entry : entries) {
+      Object value = entry.getValue();
+
+      if (value instanceof JSONArray) {
+        value = toList((JSONArray) value);
+      }
+
+      else if (value instanceof JSONObject) {
+        value = toMap((JSONObject) value);
+      }
+      elementMap.put(entry.getKey(), value);
+    }
+    return elementMap;
+  }
+
   /**
    * Appends the actual java objects into CSV string {@inheritDoc}
    */
@@ -351,6 +409,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    *
    * @param stringArray
    */
+  @SuppressWarnings("unchecked")
   private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) {
     for (int i : stringTypeColumnIndices) {
       stringArray[i] = escapeString((String) stringArray[i]);
@@ -361,6 +420,16 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     for (int i : listTypeColumnIndices) {
       stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]);
     }
+    for (int i : mapTypeColumnIndices) {
+      stringArray[i] = encodeMap((Map<Object, Object>) stringArray[i], columnArray[i]);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private String encodeMap(Map<Object, Object> map, Column column) {
+    JSONObject object = new JSONObject();
+    object.putAll(map);
+    return encloseWithQuote(object.toJSONString());
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d68c05d3/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index b629897..bd082aa 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -26,12 +26,15 @@ import static org.junit.Assert.assertTrue;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Array;
 import org.apache.sqoop.schema.type.Binary;
 import org.apache.sqoop.schema.type.Bit;
 import org.apache.sqoop.schema.type.Date;
@@ -43,8 +46,6 @@ import org.junit.Test;
 
 public class TestCSVIntermediateDataFormat {
 
-  private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
-
   private IntermediateDataFormat<?> dataFormat;
 
   @Before
@@ -54,8 +55,10 @@ public class TestCSVIntermediateDataFormat {
 
   private String getByteFieldString(byte[] byteFieldData) {
     try {
-      return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
-    } catch(UnsupportedEncodingException e) {
+      return new StringBuilder("'")
+          .append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET))
+          .append("'").toString();
+    } catch (UnsupportedEncodingException e) {
       // Should never get to this point because ISO-8859-1 is a standard codec.
       return null;
     }
@@ -566,7 +569,162 @@ public class TestCSVIntermediateDataFormat {
     String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
     assertEquals(expected, dataFormat.getTextData());
   }
+  //**************test cases for map**********************
 
+  @Test
+  public void testMapWithSimpleValueWithObjectArrayInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("testKey", "testValue");
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = map;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
+    assertEquals(map, expectedMap);
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testMapWithComplexIntegerListValueWithObjectArrayInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
+        new FixedPoint("number"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> givenMap = new HashMap<Object, Object>();
+    List<Integer> intList = new ArrayList<Integer>();
+    intList.add(11);
+    intList.add(12);
+    givenMap.put("testKey", intList);
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenMap;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
+    assertEquals(givenMap.toString(), expectedMap.toString());
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testMapWithComplexStringListValueWithObjectArrayInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
+        new Text("text"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> givenMap = new HashMap<Object, Object>();
+    List<String> stringList = new ArrayList<String>();
+    stringList.add("A");
+    stringList.add("A");
+    givenMap.put("testKey", stringList);
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenMap;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
+    assertEquals(givenMap.toString(), expectedMap.toString());
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testMapWithComplexMapValueWithObjectArrayInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
+        new Text("text"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> givenMap = new HashMap<Object, Object>();
+    List<String> stringList = new ArrayList<String>();
+    stringList.add("A");
+    stringList.add("A");
+    Map<String, List<String>> anotherMap = new HashMap<String, List<String>>();
+    anotherMap.put("anotherKey", stringList);
+    givenMap.put("testKey", anotherMap);
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenMap;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
+    assertEquals(givenMap.toString(), expectedMap.toString());
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+ @Test
+  public void testMapWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> givenMap = new HashMap<Object, Object>();
+    givenMap.put("testKey", "testValue");
+    Object[] data = new Object[2];
+    data[0] = givenMap;
+    data[1] = "text";
+    String testData = "'{\"testKey\":\"testValue\"}','text'";
+    dataFormat.setTextData(testData);
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
+    assertEquals(givenMap, expectedMap);
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testMapWithComplexValueWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> givenMap = new HashMap<Object, Object>();
+    givenMap.put("testKey", "testValue");
+    Object[] data = new Object[2];
+    data[0] = givenMap;
+    data[1] = "text";
+    String testData = "'{\"testKey\":\"testValue\"}','text'";
+    dataFormat.setTextData(testData);
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
+    assertEquals(givenMap, expectedMap);
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testMapWithObjectArrayInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Map<Object, Object> givenMap = new HashMap<Object, Object>();
+    givenMap.put("testKey", "testValue");
+    Object[] data = new Object[2];
+    data[0] = givenMap;
+    data[1] = "text";
+    String testData = "'{\"testKey\":\"testValue\"}','text'";
+    dataFormat.setObjectData(data);
+    assertEquals(testData, dataFormat.getTextData());
+  }
+
+  @Test
+  public void testMapWithCSVTextInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    String testData = "'{\"testKey\":\"testValue\"}','text'";
+    dataFormat.setTextData(testData);
+    assertEquals(testData, dataFormat.getTextData());
+  }
   //**************test cases for schema*******************
   @Test(expected=SqoopException.class)
   public void testEmptySchema() {