You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/12/09 00:47:14 UTC

sqoop git commit: SQOOP-1845: Sqoop2: Make DateTime Column type support datetime with and without timezone

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 5f58a1d46 -> fdc046950


SQOOP-1845: Sqoop2: Make DateTime Column type support datetime with and without timezone

(Veena Basavaraj via Abraham ELmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fdc04695
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fdc04695
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fdc04695

Branch: refs/heads/sqoop2
Commit: fdc0469505583ed5730566fe5797103c8c5fe4e1
Parents: 5f58a1d
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Dec 8 17:12:36 2014 -0600
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Dec 8 17:13:09 2014 -0600

----------------------------------------------------------------------
 .../sqoop/json/util/SchemaSerialization.java    |  16 +--
 .../org/apache/sqoop/schema/type/DateTime.java  |  49 ++++----
 .../java/org/apache/sqoop/schema/type/Time.java |  27 +++--
 .../json/util/TestSchemaSerialization.java      |   4 +-
 .../connector/jdbc/util/SqlTypesUtils.java      |   4 +-
 .../idf/CSVIntermediateDataFormat.java          |  78 +++++++++++--
 .../idf/TestCSVIntermediateDataFormat.java      | 111 ++++++++++++++-----
 7 files changed, 197 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
index 2f5fadc..fa9cd5c 100644
--- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
+++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
@@ -153,8 +153,8 @@ public class SchemaSerialization {
       ret.put(CHAR_SIZE, ((AbstractString) column).getCharSize());
       break;
     case DATE_TIME:
-      ret.put(FRACTION, ((DateTime) column).getFraction());
-      ret.put(TIMEZONE, ((DateTime) column).getTimezone());
+      ret.put(FRACTION, ((DateTime) column).hasFraction());
+      ret.put(TIMEZONE, ((DateTime) column).hasTimezone());
       break;
     case DECIMAL:
       ret.put(PRECISION, ((Decimal) column).getPrecision());
@@ -168,7 +168,7 @@ public class SchemaSerialization {
       ret.put(BYTE_SIZE, ((FloatingPoint) column).getByteSize());
       break;
     case TIME:
-      ret.put(FRACTION, ((Time) column).getFraction());
+      ret.put(FRACTION, ((Time) column).hasFraction());
       break;
     case UNKNOWN:
       ret.put(JDBC_TYPE, ((Unknown) column).getJdbcType());
@@ -234,9 +234,9 @@ public class SchemaSerialization {
       output = new Date(name);
       break;
     case DATE_TIME:
-      Boolean fraction = (Boolean) obj.get(FRACTION);
-      Boolean timezone = (Boolean) obj.get(TIMEZONE);
-      output = new DateTime(name).setFraction(fraction).setTimezone(timezone);
+      Boolean hasFraction = (Boolean) obj.get(FRACTION);
+      Boolean hasTimezone = (Boolean) obj.get(TIMEZONE);
+      output = new DateTime(name, hasFraction, hasTimezone);
       break;
     case DECIMAL:
       Long precision = (Long) obj.get(PRECISION);
@@ -266,8 +266,8 @@ public class SchemaSerialization {
       output = new Text(name).setCharSize(charSize);
       break;
     case TIME:
-      Boolean timeFraction = (Boolean) obj.get(FRACTION);
-      output = new Time(name).setFraction(timeFraction);
+      Boolean hasTimeFraction = (Boolean) obj.get(FRACTION);
+      output = new Time(name, hasTimeFraction);
       break;
     case UNKNOWN:
       Long jdbcType = (Long) obj.get(JDBC_TYPE);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/common/src/main/java/org/apache/sqoop/schema/type/DateTime.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/DateTime.java b/common/src/main/java/org/apache/sqoop/schema/type/DateTime.java
index 791542a..5e237a6 100644
--- a/common/src/main/java/org/apache/sqoop/schema/type/DateTime.java
+++ b/common/src/main/java/org/apache/sqoop/schema/type/DateTime.java
@@ -25,47 +25,38 @@ package org.apache.sqoop.schema.type;
 public class DateTime extends AbstractDateTime {
 
   /**
-   * The column can contain fractions of seconds.
+   * The column can contain fractions of seconds (a.k.a nanos)
    */
-  private Boolean fraction;
+  private Boolean hasFraction;
 
   /**
    * The column do have encoded timezone.
    */
-  private Boolean timezone;
+  private Boolean hasTimezone;
 
-  public DateTime(String name) {
+  public DateTime(String name, Boolean hasFraction, Boolean hasTimezone) {
     super(name);
+    this.hasFraction = hasFraction;
+    this.hasTimezone = hasTimezone;
   }
 
-  public DateTime(String name, Boolean fraction, Boolean timezone) {
-    super(name);
-    this.fraction = fraction;
-    this.timezone = timezone;
-  }
-
-  public DateTime(String name, Boolean nullable, Boolean fraction, Boolean timezone) {
+  public DateTime(String name, Boolean nullable, Boolean hasFraction, Boolean hasTimezone) {
     super(name, nullable);
-    this.fraction = fraction;
-    this.timezone = timezone;
+    this.hasFraction = hasFraction;
+    this.hasTimezone = hasTimezone;
   }
 
-  public Boolean getFraction() {
-    return fraction;
+  public Boolean hasFraction() {
+    return hasFraction;
   }
 
   public DateTime setFraction(Boolean fraction) {
-    this.fraction = fraction;
+    this.hasFraction = fraction;
     return this;
   }
 
-  public Boolean getTimezone() {
-    return timezone;
-  }
-
-  public DateTime setTimezone(Boolean timezone) {
-    this.timezone = timezone;
-    return this;
+  public Boolean hasTimezone() {
+    return hasTimezone;
   }
 
   @Override
@@ -77,8 +68,8 @@ public class DateTime extends AbstractDateTime {
   public String toString() {
     return new StringBuilder("Date{")
       .append(super.toString())
-      .append(",fraction=").append(fraction)
-      .append(",timezone=").append(timezone)
+      .append(",hasFraction=").append(hasFraction)
+      .append(",hasTimezone=").append(hasTimezone)
       .append("}")
       .toString();
   }
@@ -91,9 +82,9 @@ public class DateTime extends AbstractDateTime {
 
     DateTime dateTime = (DateTime) o;
 
-    if (fraction != null ? !fraction.equals(dateTime.fraction) : dateTime.fraction != null)
+    if (hasFraction != null ? !hasFraction.equals(dateTime.hasFraction) : dateTime.hasFraction != null)
       return false;
-    if (timezone != null ? !timezone.equals(dateTime.timezone) : dateTime.timezone != null)
+    if (hasTimezone != null ? !hasTimezone.equals(dateTime.hasTimezone) : dateTime.hasTimezone != null)
       return false;
 
     return true;
@@ -102,8 +93,8 @@ public class DateTime extends AbstractDateTime {
   @Override
   public int hashCode() {
     int result = super.hashCode();
-    result = 31 * result + (fraction != null ? fraction.hashCode() : 0);
-    result = 31 * result + (timezone != null ? timezone.hashCode() : 0);
+    result = 31 * result + (hasFraction != null ? hasFraction.hashCode() : 0);
+    result = 31 * result + (hasTimezone != null ? hasTimezone.hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/common/src/main/java/org/apache/sqoop/schema/type/Time.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Time.java b/common/src/main/java/org/apache/sqoop/schema/type/Time.java
index 9765eab..e9adce9 100644
--- a/common/src/main/java/org/apache/sqoop/schema/type/Time.java
+++ b/common/src/main/java/org/apache/sqoop/schema/type/Time.java
@@ -24,28 +24,27 @@ package org.apache.sqoop.schema.type;
  */
 public class Time extends AbstractDateTime {
 
-  private Boolean fraction;
+  /**
+   * The column can contain fractions of seconds (a.k.a nanos)
+   */
+  private Boolean hasFraction;
 
-  public Time(String name) {
+  public Time(String name, Boolean hasFraction) {
     super(name);
-  }
-
-  public Time(String name, Boolean fraction) {
-    super(name);
-    this.fraction = fraction;
+    this.hasFraction = hasFraction;
   }
 
   public Time(String name, Boolean nullable, Boolean fraction) {
     super(name, nullable);
-    this.fraction = fraction;
+    this.hasFraction = fraction;
   }
 
-  public Boolean getFraction() {
-    return fraction;
+  public Boolean hasFraction() {
+    return hasFraction;
   }
 
   public Time setFraction(Boolean fraction) {
-    this.fraction = fraction;
+    this.hasFraction = fraction;
     return this;
   }
 
@@ -58,7 +57,7 @@ public class Time extends AbstractDateTime {
   public String toString() {
     return new StringBuilder("Time{")
       .append(super.toString())
-      .append(",fraction=").append(fraction)
+      .append(",hasFraction=").append(hasFraction)
       .append("}")
       .toString();
   }
@@ -71,7 +70,7 @@ public class Time extends AbstractDateTime {
 
     Time time = (Time) o;
 
-    if (fraction != null ? !fraction.equals(time.fraction) : time.fraction != null)
+    if (hasFraction != null ? !hasFraction.equals(time.hasFraction) : time.hasFraction != null)
       return false;
 
     return true;
@@ -80,7 +79,7 @@ public class Time extends AbstractDateTime {
   @Override
   public int hashCode() {
     int result = super.hashCode();
-    result = 31 * result + (fraction != null ? fraction.hashCode() : 0);
+    result = 31 * result + (hasFraction != null ? hasFraction.hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
index 3d31600..f047556 100644
--- a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
+++ b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
@@ -159,7 +159,7 @@ public class TestSchemaSerialization {
       .addColumn(new Binary("b"))
       .addColumn(new Bit("c"))
       .addColumn(new Date("d"))
-      .addColumn(new DateTime("e"))
+      .addColumn(new DateTime("e", true, true))
       .addColumn(new Decimal("f"))
       .addColumn(new Enum("g", Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(new String[] { "X", "Y" })))))
       .addColumn(new FixedPoint("h"))
@@ -167,7 +167,7 @@ public class TestSchemaSerialization {
       .addColumn(new Map("j", new Text("j1"), new Text("j2")))
       .addColumn(new Set("k", new Text("k1")))
       .addColumn(new Text("l"))
-      .addColumn(new Time("m"))
+      .addColumn(new Time("m", true))
       .addColumn(new Unknown("u"))
     ;
     transferAndAssert(allTypes);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
index 9d50dc3..9cfee46 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
@@ -64,10 +64,10 @@ public class SqlTypesUtils {
         return new Date(columnName);
 
       case Types.TIME:
-        return new Time(columnName);
+        return new Time(columnName, true);
 
       case Types.TIMESTAMP:
-        return new DateTime(columnName);
+        return new DateTime(columnName, true, false);
 
       case Types.FLOAT:
       case Types.REAL:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index daa51eb..d6470e6 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -27,7 +27,6 @@ import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
-import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
 import org.joda.time.LocalTime;
 import org.joda.time.format.DateTimeFormat;
@@ -88,9 +87,17 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   // ISO-8859-1 is an 8-bit codec that is supported in every java implementation.
   static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
   // http://www.joda.org/joda-time/key_format.html provides details on the formatter token
-  static final DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSZ");
+  // can have fraction and or timezone
+  static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ");
+  static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+  static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
+  static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ");
+
+  // only date, no time
   static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
-  static final DateTimeFormatter tf = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
+  // time with fraction only, no timezone
+  static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
+  static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
 
   private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
   private final List<Integer> bitTypeColumnIndices = new ArrayList<Integer>();
@@ -290,11 +297,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       returnValue = LocalTime.parse(removeQuotes(fieldString));
       break;
     case DATE_TIME:
-      // A datetime string with a space as date-time separator will not be
-      // parsed expectedly. The expected separator is "T". See also:
-      // https://github.com/JodaOrg/joda-time/issues/11
-      String dateTime = removeQuotes(fieldString).replace(" ", "T");
-      returnValue = DateTime.parse(dateTime);
+      returnValue = parseDateTime(fieldString, column);
       break;
     case BIT:
       if ((TRUE_BIT_SET.contains(fieldString)) || (FALSE_BIT_SET.contains(fieldString))) {
@@ -318,6 +321,27 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return returnValue;
   }
 
+  private Object parseDateTime(String fieldString, Column column) {
+    Object returnValue;
+    String dateTime = removeQuotes(fieldString);
+    org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
+    if (col.hasFraction() && col.hasTimezone()) {
+      // After calling withOffsetParsed method, a string
+      // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
+      // -08:00 (a fixed zone, with no daylight savings rules)
+      returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime);
+    } else if (col.hasFraction() && !col.hasTimezone()) {
+      // we use local date time explicitly to not include the timezone
+      returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime);
+    } else if (col.hasTimezone()) {
+      returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime);
+    } else {
+      // we use local date time explicitly to not include the timezone
+      returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime);
+    }
+    return returnValue;
+  }
+
   private Object[] parseListElementFromJSON(String fieldString) {
 
     JSONArray array = null;
@@ -454,10 +478,14 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       objectArray[i] = escapeString((String) objectArray[i]);
     }
     for (int i : dateTimeTypeColumnIndices) {
+      Column col = columnArray[i];
       if (objectArray[i] instanceof org.joda.time.DateTime) {
-        objectArray[i] = encloseWithQuote(dtf.print((org.joda.time.DateTime) objectArray[i]));
+        org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
+        // check for fraction and time zone and then use the right formatter
+        formatDateTime(objectArray, i, col, dateTime);
       } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
-        objectArray[i] = encloseWithQuote(dtf.print((org.joda.time.LocalDateTime) objectArray[i]));
+        org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
+        formatLocalDateTime(objectArray, i, col, localDateTime);
       }
     }
     for (int i : dateTypeColumnIndices) {
@@ -465,8 +493,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       objectArray[i] = encloseWithQuote(df.print(date));
     }
     for (int i : timeColumnIndices) {
-      org.joda.time.LocalTime date = (org.joda.time.LocalTime) objectArray[i];
-      objectArray[i] = encloseWithQuote(tf.print(date));
+      Column col = columnArray[i];
+      if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
+        objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
+      } else {
+        objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
+      }
     }
     for (int i : byteTypeColumnIndices) {
       objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
@@ -479,6 +511,28 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     }
   }
 
+  private void formatLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
+    org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
+    if (column.hasFraction()) {
+      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
+    } else {
+      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
+    }
+  }
+
+  private void formatDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
+    org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
+    if (column.hasFraction() && column.hasTimezone()) {
+      objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
+    } else if (column.hasFraction() && !column.hasTimezone()) {
+      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
+    } else if (column.hasTimezone()) {
+      objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
+    } else {
+      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private String encodeMap(Map<Object, Object> map, Column column) {
     JSONObject object = new JSONObject();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fdc04695/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index e66c897..3e7c0d1 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -42,6 +42,7 @@ import org.apache.sqoop.schema.type.DateTime;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.Text;
 import org.apache.sqoop.schema.type.Time;
+import org.joda.time.DateTimeZone;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -278,7 +279,7 @@ public class TestCSVIntermediateDataFormat {
   @Test
   public void testTimeWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new Time("1"));
+    schema.addColumn(new Time("1", false));
     dataFormat.setSchema(schema);
     dataFormat.setTextData("'12:00:00'");
     assertEquals("'12:00:00'", dataFormat.getTextData());
@@ -287,7 +288,7 @@ public class TestCSVIntermediateDataFormat {
   @Test
   public void testTimeWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new Time("1"));
+    schema.addColumn(new Time("1", false));
     dataFormat.setSchema(schema);
     dataFormat.setTextData("'12:59:59'");
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
@@ -297,7 +298,7 @@ public class TestCSVIntermediateDataFormat {
   @Test
   public void testTimeWithObjectArrayInCSVTextOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new Time("1")).addColumn(new Text("2"));
+    schema.addColumn(new Time("1", true)).addColumn(new Text("2"));
     dataFormat.setSchema(schema);
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0);
     Object[] in = { time, "test" };
@@ -308,7 +309,7 @@ public class TestCSVIntermediateDataFormat {
   @Test
   public void testTimeWithObjectArrayInObjectArrayOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new Time("1"));
+    schema.addColumn(new Time("1", true));
     dataFormat.setSchema(schema);
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(2, 23, 33);
     Object[] in = { time };
@@ -364,7 +365,7 @@ public class TestCSVIntermediateDataFormat {
   @Test
   public void testDateTimeWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new DateTime("1"));
+    schema.addColumn(new DateTime("1", false, false));
     dataFormat.setSchema(schema);
 
     dataFormat.setTextData("'2014-10-01 12:00:00'");
@@ -372,64 +373,124 @@ public class TestCSVIntermediateDataFormat {
   }
 
   @Test
-  public void testDateTimeWithMilliSecsWithCSVTextInCSVTextOut() {
+  public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new DateTime("1"));
+    schema.addColumn(new DateTime("1", true, false));
     dataFormat.setSchema(schema);
-
     dataFormat.setTextData("'2014-10-01 12:00:00.000'");
     assertEquals("'2014-10-01 12:00:00.000'", dataFormat.getTextData());
   }
 
+  public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new DateTime("1", false, false));
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData("'2014-10-01 12:00:00'");
+    // NOTE: string representation will have the T added, it is an
+    // implementation quirk of using JODA
+    assertEquals("2014-10-01T12:00:00", dataFormat.getObjectData()[0].toString());
+  }
+
   @Test
-  public void testDateTimeWithCSVTextInObjectArrayOut() {
+  public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new DateTime("1"));
+    schema.addColumn(new DateTime("1", true, false));
     dataFormat.setSchema(schema);
+    dataFormat.setTextData("'2014-10-01 12:00:00.000'");
+    // NOTE: string representation will have the T added, it is an
+    // implementation quirk of using JODA
+    assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
+  }
 
-    dataFormat.setTextData("'2014-10-01 12:00:00'");
-    assertEquals("2014-10-01T12:00:00.000-07:00", dataFormat.getObjectData()[0].toString());
+  // since date is not quoted
+  @Test(expected = Exception.class)
+  public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new DateTime("1", true, true));
+    dataFormat.setSchema(schema);
+    DateTimeZone zone = DateTimeZone.forID("America/New_York");
+    org.joda.time.DateTime dateTime = new org.joda.time.DateTime(zone);
+    dataFormat.setTextData(dateTime.toString());
+    dataFormat.getObjectData()[0].toString();
+  }
+
+  // since date is not in expected format
+  @Test(expected = Exception.class)
+  public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new DateTime("1", true, true));
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData("'2014-3310-01 12:00:00.000'");
+    dataFormat.getObjectData()[0].toString();
   }
 
   @Test
-  public void testDateTimeWithObjectInCSVTextOut() {
+  public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new DateTime("1"));
+    schema.addColumn(new DateTime("1", true, false));
     dataFormat.setSchema(schema);
-    org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 1);
+    // current date time
+    org.joda.time.DateTime dateTime = new org.joda.time.DateTime();
+    String dateTimeString = CSVIntermediateDataFormat.dtfWithFractionNoTimeZone.print(dateTime);
+    dataFormat.setTextData("'" + dateTimeString + "'");
+    assertEquals(dateTimeString.replace(" ", "T"), dataFormat.getObjectData()[0].toString());
+  }
+
+  @Test
+  public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new DateTime("1", true, true));
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData("'2014-10-01 12:00:00.000-0400'");
+    // NOTE: string representation will have the T added, it is an
+    // implementation quirk of using JODA
+    assertEquals("2014-10-01T12:00:00.000-04:00", dataFormat.getObjectData()[0].toString());
+  }
+
+  @Test
+  public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new DateTime("1", true, true));
+    dataFormat.setSchema(schema);
+    DateTimeZone zone = DateTimeZone.forID("America/New_York");
+    org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 1, zone);
     Object[] in = { dateTime };
     dataFormat.setObjectData(in);
     // Note: DateTime has the timezone info
-    assertEquals("'2014-10-01 12:00:00.001000-0700'", dataFormat.getTextData());
+    assertEquals("'2014-10-01 12:00:00.001-0400'", dataFormat.getTextData());
   }
 
   @Test
   public void testLocalDateTimeWithObjectInCSVTextOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new DateTime("1"));
+    schema.addColumn(new DateTime("1", true, false));
     dataFormat.setSchema(schema);
     org.joda.time.LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0, 2);
     Object[] in = { dateTime };
     dataFormat.setObjectData(in);
-    // Note: LocalDateTime is missing the timezone info
-    assertEquals("'2014-10-01 12:00:00.002000'", dataFormat.getTextData());
+    // Note: LocalDateTime will not have the timezone info
+    assertEquals("'2014-10-01 12:00:00.002'", dataFormat.getTextData());
   }
 
   @Test
-  public void testDateTimePrecisionWithCSVTextInObjectArrayOut() {
+  public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new DateTime("1"));
+    schema.addColumn(new DateTime("1", true, true));
     dataFormat.setSchema(schema);
-    dataFormat.setTextData("'2014-10-01 12:00:00.000'");
-    org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0);
-    assertEquals(dateTime, dataFormat.getObjectData()[0]);
+    dataFormat.setTextData("'2014-10-01 12:00:00.000-04:00'");
+    DateTimeZone zone = DateTimeZone.forID("America/New_York");
+    org.joda.time.DateTime edateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0, zone);
+    org.joda.time.DateTime dateTime = (org.joda.time.DateTime) dataFormat.getObjectData()[0];
+    assertEquals(edateTime.toString(), dateTime.toString());
     // NOTE: string representation will have the T added, it is an
     // implementation quirk of using JODA
-    assertEquals("2014-10-01T12:00:00.000-07:00", dataFormat.getObjectData()[0].toString());
+    assertEquals("2014-10-01T12:00:00.000-04:00", dataFormat.getObjectData()[0].toString());
   }
 
   // **************test cases for BIT*******************
 
+  // **************test cases for BIT*******************
+
   @Test
   public void testBitTrueFalseWithCSVTextInAndCSVTextOut() {
     Schema schema = new Schema("test");