You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/20 18:05:31 UTC

[08/47] phoenix git commit: PHOENIX-2946 Projected comparison between date and timestamp columns always returns true

http://git-wip-us.apache.org/repos/asf/phoenix/blob/210445de/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedLong.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedLong.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedLong.java
index a21ccc3..1ee46fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedLong.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedLong.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.schema.types;
 
 import java.math.BigDecimal;
 
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Order;
 import org.apache.phoenix.schema.SortOrder;
@@ -37,163 +38,175 @@ import com.google.common.primitives.Longs;
  */
 public class PUnsignedLong extends PWholeNumber<Long> {
 
-  public static final PUnsignedLong INSTANCE = new PUnsignedLong();
-
-  private PUnsignedLong() {
-    super("UNSIGNED_LONG", 10 /* no constant available in Types */, Long.class,
-        new UnsignedLongCodec(), 15);
-  }
-
-  @Override
-  public boolean isOrderPreserving() {
-    return true;
-  }
-
-  @Override
-  public Order getOrder() {
-    return Order.ASCENDING;
-  }
-
-  @Override
-  public boolean isSkippable() {
-    return true;
-  }
-
-  @Override
-  public Integer getScale(Object o) {
-    return ZERO;
-  }
-
-  @Override
-  public byte[] toBytes(Object object) {
-    byte[] b = new byte[Bytes.SIZEOF_LONG];
-    toBytes(object, b, 0);
-    return b;
-  }
-
-  @Override
-  public int toBytes(Object object, byte[] b, int o) {
-    if (object == null) {
-      throw newIllegalDataException(this + " may not be null");
-    }
-    return this.getCodec().encodeLong(((Number) object).longValue(), b, o);
-  }
-
-  @Override
-  public Object toObject(Object object, PDataType actualType) {
-    Long v = (Long) PLong.INSTANCE.toObject(object, actualType);
-    throwIfNonNegativeNumber(v);
-    return v;
-  }
-
-  @Override
-  public Object toObject(byte[] b, int o, int l, PDataType actualType, SortOrder sortOrder,
-      Integer maxLength, Integer scale) {
-    Long v = (Long) PLong.INSTANCE.toObject(b, o, l, actualType, sortOrder);
-    throwIfNonNegativeNumber(v);
-    return v;
-  }
-
-  @Override
+    public static final PUnsignedLong INSTANCE = new PUnsignedLong();
+
+    private PUnsignedLong() {
+        super("UNSIGNED_LONG", 10 /* no constant available in Types */, Long.class,
+                new UnsignedLongCodec(), 15);
+    }
+
+    @Override
+    public boolean isOrderPreserving() {
+        return true;
+    }
+
+    @Override
+    public Order getOrder() {
+        return Order.ASCENDING;
+    }
+
+    @Override
+    public boolean isSkippable() {
+        return true;
+    }
+
+    @Override
+    public Integer getScale(Object o) {
+        return ZERO;
+    }
+
+    @Override
+    public byte[] toBytes(Object object) {
+        byte[] b = new byte[Bytes.SIZEOF_LONG];
+        toBytes(object, b, 0);
+        return b;
+    }
+
+    @Override
+    public int toBytes(Object object, byte[] b, int o) {
+        if (object == null) {
+            throw newIllegalDataException(this + " may not be null");
+        }
+        return this.getCodec().encodeLong(((Number) object).longValue(), b, o);
+    }
+
+    @Override
+    public Object toObject(Object object, PDataType actualType) {
+        Long v = (Long) PLong.INSTANCE.toObject(object, actualType);
+        throwIfNonNegativeNumber(v);
+        return v;
+    }
+
+    @Override
+    public Object toObject(byte[] b, int o, int l, PDataType actualType, SortOrder sortOrder,
+            Integer maxLength, Integer scale) {
+        Long v = (Long) PLong.INSTANCE.toObject(b, o, l, actualType, sortOrder);
+        throwIfNonNegativeNumber(v);
+        return v;
+    }
+
+    @Override
     public boolean isCastableTo(PDataType targetType) {
-      return super.isCastableTo(targetType) || targetType.isCoercibleTo(PTimestamp.INSTANCE);
-    }
-
-  @Override
-  public boolean isCoercibleTo(PDataType targetType) {
-    return targetType == this || targetType == PUnsignedDouble.INSTANCE || PLong.INSTANCE
-        .isCoercibleTo(targetType);
-  }
-
-  @Override
-  public boolean isCoercibleTo(PDataType targetType, Object value) {
-    return super.isCoercibleTo(targetType, value) || PLong.INSTANCE.isCoercibleTo(targetType, value);
-  }
-
-  @Override
-  public boolean isFixedWidth() {
-    return true;
-  }
-
-  @Override
-  public Integer getByteSize() {
-    return Bytes.SIZEOF_LONG;
-  }
-
-  @Override
-  public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
-    if (rhsType == PDecimal.INSTANCE) {
-      return -((BigDecimal) rhs).compareTo(BigDecimal.valueOf(((Number) lhs).longValue()));
-    } else if (equalsAny(rhsType, PDouble.INSTANCE, PFloat.INSTANCE, PUnsignedDouble.INSTANCE,
-        PUnsignedFloat.INSTANCE)) {
-      return Doubles.compare(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
-    }
-    return Longs.compare(((Number) lhs).longValue(), ((Number) rhs).longValue());
-  }
-
-  @Override
-  public boolean isComparableTo(PDataType targetType) {
-    return PDecimal.INSTANCE.isComparableTo(targetType);
-  }
-
-  @Override
-  public Object toObject(String value) {
-    if (value == null || value.length() == 0) {
-      return null;
-    }
-    try {
-      Long l = Long.parseLong(value);
-      if (l.longValue() < 0) {
-        throw newIllegalDataException("Value may not be negative(" + l + ")");
-      }
-      return l;
-    } catch (NumberFormatException e) {
-      throw newIllegalDataException(e);
-    }
-  }
-
-  @Override
-  public int getResultSetSqlType() {
-    return PLong.INSTANCE.getResultSetSqlType();
-  }
-
-  @Override
-  public Object getSampleValue(Integer maxLength, Integer arrayLength) {
-    return Math.abs((Long) PLong.INSTANCE.getSampleValue(maxLength, arrayLength));
-  }
-
-  static class UnsignedLongCodec extends PLong.LongCodec {
-
-    @Override
-    public long decodeLong(byte[] b, int o, SortOrder sortOrder) {
-      Preconditions.checkNotNull(sortOrder);
-      checkForSufficientLength(b, o, Bytes.SIZEOF_LONG);
-      long v = 0;
-      if (sortOrder == SortOrder.ASC) {
-        for (int i = o; i < o + Bytes.SIZEOF_LONG; i++) {
-          v <<= 8;
-          v ^= b[i] & 0xFF;
+        return super.isCastableTo(targetType) || targetType.isCoercibleTo(PTimestamp.INSTANCE);
+    }
+
+    @Override
+    public boolean isCoercibleTo(PDataType targetType) {
+        return targetType == this || targetType == PUnsignedDouble.INSTANCE || PLong.INSTANCE
+                .isCoercibleTo(targetType);
+    }
+
+    @Override
+    public boolean isCoercibleTo(PDataType targetType, Object value) {
+        return super.isCoercibleTo(targetType, value) || PLong.INSTANCE.isCoercibleTo(targetType, value);
+    }
+
+    @Override
+    public void coerceBytes(ImmutableBytesWritable ptr, Object object, PDataType actualType,
+            Integer maxLength, Integer scale, SortOrder actualModifier, Integer desiredMaxLength, Integer desiredScale,
+            SortOrder expectedModifier) {
+        // Decrease size of TIMESTAMP to size of LONG and continue coerce
+        if (ptr.getLength() > getByteSize()) {
+            ptr.set(ptr.get(), ptr.getOffset(), getByteSize());
         }
-      } else {
-        for (int i = o; i < o + Bytes.SIZEOF_LONG; i++) {
-          v <<= 8;
-          v ^= (b[i] & 0xFF) ^ 0xFF;
+        super.coerceBytes(ptr, object, actualType, maxLength, scale, actualModifier, desiredMaxLength,
+                desiredScale, expectedModifier);
+    }
+
+    @Override
+    public boolean isFixedWidth() {
+        return true;
+    }
+
+    @Override
+    public Integer getByteSize() {
+        return Bytes.SIZEOF_LONG;
+    }
+
+    @Override
+    public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
+        if (rhsType == PDecimal.INSTANCE) {
+            return -((BigDecimal) rhs).compareTo(BigDecimal.valueOf(((Number) lhs).longValue()));
+        } else if (equalsAny(rhsType, PDouble.INSTANCE, PFloat.INSTANCE, PUnsignedDouble.INSTANCE,
+                PUnsignedFloat.INSTANCE)) {
+            return Doubles.compare(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
         }
-      }
-      if (v < 0) {
-        throw newIllegalDataException();
-      }
-      return v;
+        return Longs.compare(((Number) lhs).longValue(), ((Number) rhs).longValue());
     }
 
     @Override
-    public int encodeLong(long v, byte[] b, int o) {
-      checkForSufficientLength(b, o, Bytes.SIZEOF_LONG);
-      if (v < 0) {
-        throw newIllegalDataException();
-      }
-      Bytes.putLong(b, o, v);
-      return Bytes.SIZEOF_LONG;
+    public boolean isComparableTo(PDataType targetType) {
+        return PDecimal.INSTANCE.isComparableTo(targetType);
+    }
+
+    @Override
+    public Object toObject(String value) {
+        if (value == null || value.length() == 0) {
+            return null;
+        }
+        try {
+            Long l = Long.parseLong(value);
+            if (l.longValue() < 0) {
+                throw newIllegalDataException("Value may not be negative(" + l + ")");
+            }
+            return l;
+        } catch (NumberFormatException e) {
+            throw newIllegalDataException(e);
+        }
+    }
+
+    @Override
+    public int getResultSetSqlType() {
+        return PLong.INSTANCE.getResultSetSqlType();
+    }
+
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        return Math.abs((Long) PLong.INSTANCE.getSampleValue(maxLength, arrayLength));
+    }
+
+    static class UnsignedLongCodec extends PLong.LongCodec {
+
+        @Override
+        public long decodeLong(byte[] b, int o, SortOrder sortOrder) {
+            Preconditions.checkNotNull(sortOrder);
+            checkForSufficientLength(b, o, Bytes.SIZEOF_LONG);
+            long v = 0;
+            if (sortOrder == SortOrder.ASC) {
+                for (int i = o; i < o + Bytes.SIZEOF_LONG; i++) {
+                    v <<= 8;
+                    v ^= b[i] & 0xFF;
+                }
+            } else {
+                for (int i = o; i < o + Bytes.SIZEOF_LONG; i++) {
+                    v <<= 8;
+                    v ^= (b[i] & 0xFF) ^ 0xFF;
+                }
+            }
+            if (v < 0) {
+                throw newIllegalDataException();
+            }
+            return v;
+        }
+
+        @Override
+        public int encodeLong(long v, byte[] b, int o) {
+            checkForSufficientLength(b, o, Bytes.SIZEOF_LONG);
+            if (v < 0) {
+                throw newIllegalDataException();
+            }
+            Bytes.putLong(b, o, v);
+            return Bytes.SIZEOF_LONG;
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/210445de/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTime.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTime.java
index 4173be1..db0ba0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTime.java
@@ -94,7 +94,7 @@ public class PUnsignedTime extends PDataType<Time> {
 
   @Override
   public boolean isBytesComparableWith(PDataType otherType) {
-    return super.isBytesComparableWith(otherType) || otherType == PUnsignedDate.INSTANCE;
+    return super.isBytesComparableWith(otherType) || otherType == PUnsignedDate.INSTANCE || otherType == PUnsignedTimestamp.INSTANCE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/210445de/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTimestamp.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTimestamp.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTimestamp.java
index 29154b0..08b28c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTimestamp.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PUnsignedTimestamp.java
@@ -17,132 +17,60 @@
  */
 package org.apache.phoenix.schema.types;
 
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.text.Format;
-
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.util.DateUtil;
-
-public class PUnsignedTimestamp extends PDataType<Timestamp> {
 
-  public static final PUnsignedTimestamp INSTANCE = new PUnsignedTimestamp();
+public class PUnsignedTimestamp extends PTimestamp {
 
-  private PUnsignedTimestamp() {
-    super("UNSIGNED_TIMESTAMP", 20, java.sql.Timestamp.class, new PUnsignedDate.UnsignedDateCodec(), 12);
-  }
+    public static final PUnsignedTimestamp INSTANCE = new PUnsignedTimestamp();
 
-  @Override
-  public byte[] toBytes(Object object) {
-    if (object == null) {
-      throw newIllegalDataException(this + " may not be null");
+    private PUnsignedTimestamp() {
+        super("UNSIGNED_TIMESTAMP", 20, 12);
     }
-    byte[] bytes = new byte[getByteSize()];
-    toBytes(object, bytes, 0);
-    return bytes;
-  }
 
-  @Override
-  public int toBytes(Object object, byte[] bytes, int offset) {
-    if (object == null) {
-      throw newIllegalDataException(this + " may not be null");
+    @Override
+    public boolean isBytesComparableWith(PDataType otherType) {
+        return equalsAny(this, otherType, PVarbinary.INSTANCE, PBinary.INSTANCE, PUnsignedTime.INSTANCE, PUnsignedDate.INSTANCE, PUnsignedLong.INSTANCE);
     }
-    java.sql.Timestamp value = (java.sql.Timestamp) object;
-    PUnsignedDate.INSTANCE.getCodec().encodeLong(value.getTime(), bytes, offset);
-
-            /*
-             * By not getting the stuff that got spilled over from the millis part,
-             * it leaves the timestamp's byte representation saner - 8 bytes of millis | 4 bytes of nanos.
-             * Also, it enables timestamp bytes to be directly compared with date/time bytes.
-             */
-    Bytes.putInt(bytes, offset + Bytes.SIZEOF_LONG, value.getNanos() % 1000000);
-    return getByteSize();
-  }
-
-  @Override
-  public Object toObject(Object object, PDataType actualType) {
-    java.sql.Timestamp ts = (java.sql.Timestamp) PTimestamp.INSTANCE.toObject(object, actualType);
-    throwIfNonNegativeDate(ts);
-    return ts;
-  }
-
-  @Override
-  public Object toObject(byte[] b, int o, int l, PDataType actualType, SortOrder sortOrder,
-      Integer maxLength, Integer scale) {
-    java.sql.Timestamp ts =
-        (java.sql.Timestamp) PTimestamp.INSTANCE.toObject(b, o, l, actualType, sortOrder);
-    throwIfNonNegativeDate(ts);
-    return ts;
-  }
-
-  @Override
-  public boolean isCastableTo(PDataType targetType) {
-    return PUnsignedDate.INSTANCE.isCastableTo(targetType);
-  }
-
-  @Override
-  public boolean isCoercibleTo(PDataType targetType) {
-    return targetType.equals(this) || PUnsignedDate.INSTANCE.isCoercibleTo(targetType);
-  }
-
-  @Override
-  public boolean isCoercibleTo(PDataType targetType, Object value) {
-    return super.isCoercibleTo(targetType, value) || PTimestamp.INSTANCE
-        .isCoercibleTo(targetType, value);
-  }
 
-  @Override
-  public boolean isFixedWidth() {
-    return true;
-  }
-
-  @Override
-  public Integer getByteSize() {
-    return PTimestamp.INSTANCE.getByteSize();
-  }
-
-  @Override
-  public int compareTo(Object lhs, Object rhs, PDataType rhsType) {
-    return PTimestamp.INSTANCE.compareTo(lhs, rhs, rhsType);
-  }
+    @Override
+    public Object toObject(Object object, PDataType actualType) {
+        java.sql.Timestamp ts = (java.sql.Timestamp) super.toObject(object, actualType);
+        throwIfNonNegativeDate(ts);
+        return ts;
+    }
 
-  @Override
-  public Object toObject(String value) {
-    return PTimestamp.INSTANCE.toObject(value);
-  }
+    @Override
+    public boolean isCastableTo(PDataType targetType) {
+        return PUnsignedDate.INSTANCE.isCastableTo(targetType);
+    }
 
-  @Override
-  public String toStringLiteral(Object o, Format formatter) {
-    if (formatter == null) {
-      formatter = DateUtil.DEFAULT_TIMESTAMP_FORMATTER;
+    @Override
+    public boolean isCoercibleTo(PDataType targetType) {
+        return targetType.equals(this) || PUnsignedDate.INSTANCE.isCoercibleTo(targetType);
     }
-    return "'" + super.toStringLiteral(o, formatter) + "'";
-  }
 
-  @Override
-  public int getNanos(ImmutableBytesWritable ptr, SortOrder sortOrder) {
-    int nanos = PUnsignedInt.INSTANCE.getCodec()
-        .decodeInt(ptr.get(), ptr.getOffset() + PLong.INSTANCE.getByteSize(), sortOrder);
-    return nanos;
-  }
+    @Override
+    public boolean isCoercibleTo(PDataType targetType, Object value) {
+        return super.isCoercibleTo(targetType, value) || PTimestamp.INSTANCE
+                .isCoercibleTo(targetType, value);
+    }
 
-  @Override
-  public long getMillis(ImmutableBytesWritable ptr, SortOrder sortOrder) {
-    long millis =
-        PUnsignedLong.INSTANCE.getCodec().decodeLong(ptr.get(), ptr.getOffset(), sortOrder);
-    return millis;
-  }
+    @Override
+    public int getResultSetSqlType() {
+        return PTimestamp.INSTANCE.getResultSetSqlType();
+    }
 
-  @Override
-  public int getResultSetSqlType() {
-    return Types.TIMESTAMP;
-  }
+    @Override
+    public int getNanos(ImmutableBytesWritable ptr, SortOrder sortOrder) {
+        int nanos = PUnsignedInt.INSTANCE.getCodec()
+                .decodeInt(ptr.get(), ptr.getOffset() + PLong.INSTANCE.getByteSize(), sortOrder);
+        return nanos;
+    }
 
-  @Override
-  public Object getSampleValue(Integer maxLength, Integer arrayLength) {
-    return new java.sql.Timestamp(
-        (Long) PUnsignedLong.INSTANCE.getSampleValue(maxLength, arrayLength));
-  }
+    @Override
+    public Object getSampleValue(Integer maxLength, Integer arrayLength) {
+        return new java.sql.Timestamp(
+                (Long) PUnsignedLong.INSTANCE.getSampleValue(maxLength, arrayLength));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/210445de/phoenix-core/src/main/java/org/apache/phoenix/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/DateUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/DateUtil.java
index 557d8ed..7f62797 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/DateUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/DateUtil.java
@@ -32,9 +32,14 @@ import java.util.List;
 import java.util.TimeZone;
 
 import org.apache.commons.lang.time.FastDateFormat;
-import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDataType.PDataCodec;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PUnsignedDate;
+import org.apache.phoenix.schema.types.PUnsignedTimestamp;
 import org.joda.time.DateTimeZone;
 import org.joda.time.chrono.ISOChronology;
 import org.joda.time.format.DateTimeFormatter;
@@ -42,6 +47,7 @@ import org.joda.time.format.DateTimeFormatterBuilder;
 import org.joda.time.format.ISODateTimeFormat;
 
 import com.google.common.collect.Lists;
+import com.sun.istack.NotNull;
 
 
 @SuppressWarnings({ "serial", "deprecation" })
@@ -74,6 +80,21 @@ public class DateUtil {
     private DateUtil() {
     }
 
+    @NotNull
+    public static PDataCodec getCodecFor(PDataType type) {
+        PDataCodec codec = type.getCodec();
+        if (codec != null) {
+            return codec;
+        }
+        if (type == PTimestamp.INSTANCE) {
+            return PDate.INSTANCE.getCodec();
+        } else if (type == PUnsignedTimestamp.INSTANCE) {
+            return PUnsignedDate.INSTANCE.getCodec();
+        } else {
+            throw new RuntimeException(TypeMismatchException.newException(PTimestamp.INSTANCE, type));
+        }
+    }
+    
     private static TimeZone getTimeZone(String timeZoneId) {
         TimeZone parserTimeZone;
         if (timeZoneId == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/210445de/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
index cddafc6..0d3e17d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.List;
 import java.util.Properties;
+
 import javax.annotation.Nullable;
 
 import org.apache.commons.csv.CSVRecord;
@@ -30,6 +31,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDataType.PDataCodec;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.DateUtil;
@@ -112,6 +114,7 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
     static class SimpleDatatypeConversionFunction implements Function<String, Object> {
 
         private final PDataType dataType;
+        private final PDataCodec codec;
         private final DateUtil.DateTimeParser dateTimeParser;
 
         SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
@@ -122,7 +125,9 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
                 throw new RuntimeException(e);
             }
             this.dataType = dataType;
+            PDataCodec codec = dataType.getCodec();
             if(dataType.isCoercibleTo(PTimestamp.INSTANCE)) {
+                codec = DateUtil.getCodecFor(dataType);
                 // TODO: move to DateUtil
                 String dateFormat;
                 int dateSqlType = dataType.getResultSetSqlType();
@@ -142,6 +147,7 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
             } else {
                 this.dateTimeParser = null;
             }
+            this.codec = codec;
         }
 
         @Nullable
@@ -153,7 +159,7 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
             if (dateTimeParser != null) {
                 long epochTime = dateTimeParser.parseDateTime(input);
                 byte[] byteValue = new byte[dataType.getByteSize()];
-                dataType.getCodec().encodeLong(epochTime, byteValue, 0);
+                codec.encodeLong(epochTime, byteValue, 0);
                 return dataType.toObject(byteValue);
             } else if (dataType == PBoolean.INSTANCE) {
                 switch (input.toLowerCase()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/210445de/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
index 085af44..ba48a8a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/types/PDataTypeTest.java
@@ -1837,6 +1837,24 @@ public class PDataTypeTest {
     }
     
     @Test
+    public void testTimestampToDateComparison() {
+        long now = System.currentTimeMillis();
+        Timestamp ts1 = DateUtil.getTimestamp(now,  1111);    
+        final byte[] bytes1 = PTimestamp.INSTANCE.toBytes(ts1);
+        Date ts2 = new Date(now);
+        final byte[] bytes2 = PDate.INSTANCE.toBytes(ts2);
+        assertTrue(PTimestamp.INSTANCE.compareTo(bytes1, 0, bytes1.length, SortOrder.getDefault(), bytes2, 0, bytes2.length, SortOrder.getDefault(), PDate.INSTANCE) > 0);
+
+        Timestamp ts3 = DateUtil.getTimestamp(now,  0);    
+        final byte[] bytes3 = PTimestamp.INSTANCE.toBytes(ts3);
+        assertTrue(PTimestamp.INSTANCE.compareTo(bytes3, 0, bytes3.length, SortOrder.getDefault(), bytes2, 0, bytes2.length, SortOrder.getDefault(), PDate.INSTANCE) == 0);
+
+        Timestamp ts4 = DateUtil.getTimestamp(now,  0);    
+        final byte[] bytes4 = PUnsignedTimestamp.INSTANCE.toBytes(ts4);
+        assertTrue(PUnsignedTimestamp.INSTANCE.compareTo(bytes4, 0, bytes4.length, SortOrder.getDefault(), bytes2, 0, bytes2.length, SortOrder.getDefault(), PDate.INSTANCE) == 0);
+    }
+    
+    @Test
     public void testTimestamp() {
         long now = System.currentTimeMillis();
         Timestamp ts1 = DateUtil.getTimestamp(now,  1111);