You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2012/08/24 02:18:09 UTC

svn commit: r1376800 [4/6] - in /pig/trunk: ./ .eclipse.templates/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/compa...

Added: pig/trunk/src/org/apache/pig/builtin/SecondsBetween.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/SecondsBetween.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/SecondsBetween.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/SecondsBetween.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+
+/**
+ * <p>SecondsBetween returns the number of seconds between two DateTime objects</p>
+ *
+ * <ul>
+ * <li>Jodatime: http://joda-time.sourceforge.net/</li>
+ * <li>ISO8601 Date Format: http://en.wikipedia.org/wiki/ISO_8601</li>
+ * </ul>
+ * <br />
+ * <pre>
+ * Example usage:
+ *
+ * ISOin = LOAD 'test.tsv' USING PigStorage('\t') AS (datetime, dt2:datetime);
+ *
+ * DESCRIBE ISOin;
+ * ISOin: {dt: datetime,dt2: datetime}
+ *
+ * DUMP ISOin;
+ *
+ * (2009-01-07T01:07:01.000Z,2008-02-01T00:00:00.000Z)
+ * (2008-02-06T02:06:02.000Z,2008-02-01T00:00:00.000Z)
+ * (2007-03-05T03:05:03.000Z,2008-02-01T00:00:00.000Z)
+ * ...
+ *
+ * diffs = FOREACH ISOin GENERATE YearsBetween(dt, dt2) AS years,
+ * MonthsBetween(dt, dt2) AS months,
+ * WeeksBetween(dt, dt2) AS weeks,
+ * DaysBetween(dt, dt2) AS days,
+ * HoursBetween(dt, dt2) AS hours,
+ * MinutesBetween(dt, dt2) AS mins,
+ * SecondsBetween(dt, dt2) AS secs;
+ * MilliSecondsBetween(dt, dt2) AS millis;
+ *
+ * DESCRIBE diffs;
+ * diffs: {years: long,months: long,weeks: long,days: long,hours: long,mins: long,secs: long,millis: long}
+ *
+ * DUMP diffs;
+ *
+ * (0L,11L,48L,341L,8185L,491107L,29466421L,29466421000L)
+ * (0L,0L,0L,5L,122L,7326L,439562L,439562000L)
+ * (0L,-10L,-47L,-332L,-7988L,-479334L,-28760097L,-28760097000L)
+ *
+ * </pre>
+ */
+public class SecondsBetween extends EvalFunc<Long> {
+
+    @Override
+    public Long exec(Tuple input) throws IOException
+    {
+        if (input == null || input.size() < 2) {
+            return null;
+        }
+
+        DateTime startDate = (DateTime) input.get(0);
+        DateTime endDate = (DateTime) input.get(1);
+
+        // Larger date first
+        // Subtraction may overflow
+        return (startDate.getMillis() - endDate.getMillis()) / 1000L;
+    }
+
+	@Override
+	public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.LONG));
+	}
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        funcList.add(new FuncSpec(this.getClass().getName(), s));
+        return funcList;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/SubtractDuration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/SubtractDuration.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/SubtractDuration.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/SubtractDuration.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,77 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+/**
+ * <p>SubtractDuration returns the result of a DateTime object plus a Duration object</p>
+ *
+ * <ul>
+ * <li>Jodatime: http://joda-time.sourceforge.net/</li>
+ * <li>ISO8601 Duration Format: http://en.wikipedia.org/wiki/ISO_8601#Durations</li>
+ * </ul>
+ * <br />
+ * <pre>
+ * Example usage:
+ *
+ * ISOin = LOAD 'test.tsv' USING PigStorage('\t') AS (dt:datetime, dr:chararray);
+ *
+ * DESCRIBE ISOin;
+ * ISOin: {dt: datetime,dr: chararray}
+ *
+ * DUMP ISOin;
+ *
+ * (2009-01-07T01:07:01.000Z,PT1S)
+ * (2008-02-06T02:06:02.000Z,PT1M)
+ * (2007-03-05T03:05:03.000Z,P1D)
+ * ...
+ *
+ * dtsubtract = FOREACH ISOin GENERATE SubtractDuration(dt, dr) AS dt1;
+ *
+ * DESCRIBE dtsubtract;
+ * dtsubtract: {dt1: datetime}
+ *
+ * DUMP dtsubtract;
+ *
+ * (2009-01-07T01:07:00.000Z)
+ * (2008-02-06T02:05:02.000Z)
+ * (2007-03-04T03:05:03.000Z)
+ *
+ * </pre>
+ */
+public class SubtractDuration extends EvalFunc<DateTime> {
+
+    @Override
+    public DateTime exec(Tuple input) throws IOException {
+        if (input == null || input.size() < 2) {
+            return null;
+        }
+        
+        return ((DateTime) input.get(0)).minus(new Period((String) input.get(1)));
+    }
+    
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.DATETIME));
+    }
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        funcList.add(new FuncSpec(this.getClass().getName(), s));
+        return funcList;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TextLoader.java Fri Aug 24 00:18:05 2012
@@ -20,6 +20,8 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -120,6 +122,17 @@ public class TextLoader extends LoadFunc
         String msg = "TextLoader does not support conversion to Double.";
         throw new ExecException(msg, errCode, PigException.BUG);
     }
+    
+    /**
+     * TextLoader does not support conversion to DateTime
+     * @throws IOException if the value cannot be cast.
+     */
+    @Override
+    public DateTime bytesToDateTime(byte[] b) throws IOException {
+        int errCode = 2109;
+        String msg = "TextLoader does not support conversion to DateTime.";
+        throw new ExecException(msg, errCode, PigException.BUG);
+    }
 
     /**
      * Cast data from bytes to chararray value.  
@@ -209,6 +222,12 @@ public class TextLoader extends LoadFunc
         throw new ExecException(msg, errCode, PigException.BUG);
     }
 
+    public byte[] toBytes(DateTime dt) throws IOException {
+        int errCode = 2109;
+        String msg = "TextLoader does not support conversion from DateTime.";
+        throw new ExecException(msg, errCode, PigException.BUG);
+    }
+
     public byte[] toBytes(Map<String, Object> m) throws IOException {
         int errCode = 2109;
         String msg = "TextLoader does not support conversion from Map.";

Added: pig/trunk/src/org/apache/pig/builtin/ToDate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToDate.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToDate.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToDate.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,112 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * 
+ * <p>ToDate converts the ISO or the customized string or the Unix timestamp to the DateTime object.</p>
+ * <p>ToDate is overloaded.</p>
+ * 
+ * <dl>
+ * <dt><b>Syntax:</b></dt>
+ * <dd><code>DateTime ToDate(Long millis)</code>.</dd>
+ * <dt><b>Input:</b></dt>
+ * <dd><code>the milliseconds</code>.</dd>
+ * <dt><b>Output:</b></dt>
+ * <dd><code>the DateTime object</code>.</dd>
+ * </dl>
+ *
+ * <dl>
+ * <dt><b>Syntax:</b></dt>
+ * <dd><code>DateTime ToDate(String dtStr)</code>.</dd>
+ * <dt><b>Input:</b></dt>
+ * <dd><code>the ISO format date time string</code>.</dd>
+ * <dt><b>Output:</b></dt>
+ * <dd><code>the DateTime object</code>.</dd>
+ * </dl>
+ * 
+ * <dl>
+ * <dt><b>Syntax:</b></dt>
+ * <dd><code>DateTime ToDate(String dtStr, String format)</code>.</dd>
+ * <dt><b>Input:</b></dt>
+ * <dd><code>dtStr: the string that represents a date time</code>.</dd>
+ * <dd><code>format: the format string</code>.</dd>
+ * <dt><b>Output:</b></dt>
+ * <dd><code>the DateTime object</code>.</dd>
+ * </dl>
+ *
+ * <dl>
+ * <dt><b>Syntax:</b></dt>
+ * <dd><code>DateTime ToDate(String dtStr, String format, String timezone)</code>.</dd>
+ * <dt><b>Input:</b></dt>
+ * <dd><code>dtStr: the string that represents a date time</code>.</dd>
+ * <dd><code>format: the format string</code>.</dd>
+ * <dd><code>timezone: the timezone string</code>.</dd>
+ * <dt><b>Output:</b></dt>
+ * <dd><code>the DateTime object</code>.</dd>
+ * </dl>
+ */
+public class ToDate extends EvalFunc<DateTime> {
+
+    public DateTime exec(Tuple input) throws IOException {
+        return new DateTime(DataType.toLong(input.get(0)));
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                .getName().toLowerCase(), input), DataType.DATETIME));
+    }
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.LONG));
+        funcList.add(new FuncSpec(this.getClass().getName(), s));
+        s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        funcList.add(new FuncSpec(ToDateISO.class.getClass().getName(), s));
+        s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        funcList.add(new FuncSpec(ToDate2ARGS.class.getClass().getName(), s));
+        s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        funcList.add(new FuncSpec(ToDate3ARGS.class.getClass().getName(), s));
+        return funcList;
+    }
+    
+    public static DateTimeZone extractDateTimeZone(String dtStr) {
+        Pattern pattern = Pattern.compile("(Z|((\\+|-)\\d{2}(:?\\d{2})?))$");
+        Matcher matcher = pattern.matcher(dtStr);
+        if (matcher.find()) {
+            String dtzStr = matcher.group();
+            if (dtzStr.equals("Z")) {
+                return DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null));
+            } else {
+                return DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null));
+            }
+        } else {
+            return null;
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/ToDate2ARGS.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToDate2ARGS.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToDate2ARGS.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToDate2ARGS.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,30 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * This method should never be used directly, use {@link ToDate}.
+ */
+public class ToDate2ARGS extends EvalFunc<DateTime> {
+
+    public DateTime exec(Tuple input) throws IOException {
+        String dtStr = DataType.toString(input.get(0));
+        //DateTimeZone dtz = extractDateTimeZone(dtStr);
+        //The timezone in the customized format is not predictable
+        DateTimeFormatter dtf = DateTimeFormat.forPattern(DataType
+                .toString(input.get(1)));
+        //if (dtz == null) {
+            return dtf.parseDateTime(dtStr);
+        //} else {
+        //    return dtf.withZone(dtz).parseDateTime(dtStr);
+        //}
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/builtin/ToDate3ARGS.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToDate3ARGS.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToDate3ARGS.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToDate3ARGS.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,26 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * This method should never be used directly, use {@link ToDate}.
+ */
+public class ToDate3ARGS extends EvalFunc<DateTime> {
+
+    public DateTime exec(Tuple input) throws IOException {
+        DateTimeFormatter dtf = DateTimeFormat.forPattern(DataType
+                .toString(input.get(1)));
+        DateTimeZone dtz = DateTimeZone.forOffsetMillis(DateTimeZone.forID(
+                DataType.toString(input.get(2))).getOffset(null));
+        return dtf.withZone(dtz).parseDateTime(DataType.toString(input.get(0)));
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/builtin/ToDateISO.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToDateISO.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToDateISO.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToDateISO.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,26 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+/**
+ * This method should never be used directly, use {@link ToDate}.
+ */
+public class ToDateISO extends EvalFunc<DateTime> {
+
+    public DateTime exec(Tuple input) throws IOException {
+        String dtStr = DataType.toString(input.get(0));
+        DateTimeZone dtz = ToDate.extractDateTimeZone(dtStr);
+        if (dtz == null) {
+            return new DateTime(dtStr);
+        } else {
+            return new DateTime(dtStr, dtz);
+        }
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/builtin/ToMilliSeconds.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToMilliSeconds.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToMilliSeconds.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToMilliSeconds.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+
+/**
+ * <p>
+ * ToUnixTime converts the DateTime to the number of milliseconds that have passed
+ * since January 1, 1970 00:00:00.000 GMT.
+ * </p>
+ * <ul>
+ * <li>Jodatime: http://joda-time.sourceforge.net/</li>
+ * <li>ISO8601 Date Format: http://en.wikipedia.org/wiki/ISO_8601</li>
+ * <li>Unix Time: http://en.wikipedia.org/wiki/Unix_time</li>
+ * </ul>
+ * <br />
+ * <pre>
+ * Example usage:
+ *
+ * ISOin = LOAD 'test.tsv' USING PigStorage('\t') AS (dt:datetime, dt2:datetime);
+ *
+ * DESCRIBE ISOin;
+ * ISOin: {dt: datetime,dt2: datetime}
+ *
+ * DUMP ISOin;
+ *
+ * (2009-01-07T01:07:01.000Z,2008-02-01T00:00:00.000Z)
+ * (2008-02-06T02:06:02.000Z,2008-02-01T00:00:00.000Z)
+ * (2007-03-05T03:05:03.000Z,2008-02-01T00:00:00.000Z)
+ * ...
+ *
+ * toMilliSeconds = FOREACH ISOin GENERATE ToMilliSeconds(dt) AS unixTime:long;
+ *
+ * DESCRIBE toMilliSeconds;
+ * toMilliSeconds: {unixTime: long}
+ *
+ * DUMP toMilliSeconds;
+ *
+ * (1231290421000L)
+ * (1202263562000L)
+ * (1173063903000L)
+ * ...
+ *</pre>
+ */
+public class ToMilliSeconds extends EvalFunc<Long> {
+
+    @Override
+    public Long exec(Tuple input) throws IOException
+    {
+        if (input == null || input.size() < 1) {
+            return null;
+        }
+
+        DateTime result = (DateTime) input.get(0);
+
+        return result.getMillis();
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.LONG));
+    }
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.DATETIME))));
+
+        return funcList;
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/ToString.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToString.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToString.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToString.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,52 @@
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * 
+ * <p>ToString converts the DateTime object of the ISO or the customized string.</p>
+ *
+ */
+public class ToString extends EvalFunc<String> {
+
+    public String exec(Tuple input) throws IOException {
+        if (input == null) {
+            return null;
+        }
+        if (input.size() == 1) {
+            return DataType.toDateTime(input.get(0)).toString();
+        } else if (input.size() == 2) {
+            DateTimeFormatter dtf = DateTimeFormat.forPattern(DataType.toString(input.get(1)));
+            return DataType.toDateTime(input.get(0)).toString(dtf);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.CHARARRAY));
+    }
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
+        funcList.add(new FuncSpec(this.getClass().getName(), s));
+        return funcList;
+    }
+    
+}

Added: pig/trunk/src/org/apache/pig/builtin/ToUnixTime.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ToUnixTime.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ToUnixTime.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/ToUnixTime.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+
+/**
+ * <p>ToUnixTime converts the DateTime to the Unix Time Long</p>
+ * <ul>
+ * <li>Jodatime: http://joda-time.sourceforge.net/</li>
+ * <li>ISO8601 Date Format: http://en.wikipedia.org/wiki/ISO_8601</li>
+ * <li>Unix Time: http://en.wikipedia.org/wiki/Unix_time</li>
+ * </ul>
+ * <br />
+ * <pre>
+ * Example usage:
+ *
+ * ISOin = LOAD 'test.tsv' USING PigStorage('\t') AS (dt:datetime, dt2:datetime);
+ *
+ * DESCRIBE ISOin;
+ * ISOin: {dt: datetime,dt2: datetime}
+ *
+ * DUMP ISOin;
+ *
+ * (2009-01-07T01:07:01.000Z,2008-02-01T00:00:00.000Z)
+ * (2008-02-06T02:06:02.000Z,2008-02-01T00:00:00.000Z)
+ * (2007-03-05T03:05:03.000Z,2008-02-01T00:00:00.000Z)
+ * ...
+ *
+ * toUnix = FOREACH ISOin GENERATE ToUnixTime(dt) AS unixTime:long;
+ *
+ * DESCRIBE toUnix;
+ * toUnix: {unixTime: long}
+ *
+ * DUMP toUnix;
+ *
+ * (1231290421L)
+ * (1202263562L)
+ * (1173063903L)
+ * ...
+ *</pre>
+ */
+public class ToUnixTime extends EvalFunc<Long> {
+
+    @Override
+    public Long exec(Tuple input) throws IOException
+    {
+        
+        if (input == null || input.size() < 1) {
+            return null;
+        }
+
+        DateTime result = (DateTime) input.get(0);
+
+        return result.getMillis() / 1000;
+    }
+
+	@Override
+	public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.LONG));
+	}
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.DATETIME))));
+
+        return funcList;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Fri Aug 24 00:18:05 2012
@@ -27,6 +27,9 @@ import java.util.Map;
 import java.util.Stack;
 import java.util.EmptyStackException;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.LoadStoreCaster;
@@ -309,6 +312,9 @@ public class Utf8StorageConverter implem
         case DataType.BOOLEAN:
             field = bytesToBoolean(b);
             break;
+        case DataType.DATETIME:
+            field = bytesToDateTime(b);
+            break;
         default:
             throw new IOException("Unknown simple data type");
         }
@@ -464,6 +470,28 @@ public class Utf8StorageConverter implem
     }
 
     @Override
+    public DateTime bytesToDateTime(byte[] b) throws IOException {
+        if (b == null) {
+            return null;
+        }
+        try {
+            String dtStr = new String(b);
+            DateTimeZone dtz = ToDate.extractDateTimeZone(dtStr);
+            if (dtz == null) {
+                return new DateTime(dtStr);
+            } else {
+                return new DateTime(dtStr, dtz);
+            }
+        } catch (IllegalArgumentException e) {
+            LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
+                    "converted to datetime, caught IllegalArgumentException <" +
+                    e.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;
+        }
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
     public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
         if(b == null)
@@ -547,6 +575,11 @@ public class Utf8StorageConverter implem
     }
 
     @Override
+    public byte[] toBytes(DateTime dt) throws IOException {
+        return dt.toString().getBytes();
+    }
+
+    @Override
     public byte[] toBytes(Map<String, Object> m) throws IOException {
         return DataType.mapToString(m).getBytes();
     }

Added: pig/trunk/src/org/apache/pig/builtin/WeeksBetween.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/WeeksBetween.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/WeeksBetween.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/WeeksBetween.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.Weeks;
+
+/**
+ * <p>WeeksBetween returns the number of weeks between two DateTime objects</p>
+ *
+ * <ul>
+ * <li>Jodatime: http://joda-time.sourceforge.net/</li>
+ * <li>ISO8601 Date Format: http://en.wikipedia.org/wiki/ISO_8601</li>
+ * </ul>
+ * <br />
+ * <pre>
+ * Example usage:
+ *
+ * ISOin = LOAD 'test.tsv' USING PigStorage('\t') AS (datetime, dt2:datetime);
+ *
+ * DESCRIBE ISOin;
+ * ISOin: {dt: datetime,dt2: datetime}
+ *
+ * DUMP ISOin;
+ *
+ * (2009-01-07T01:07:01.000Z,2008-02-01T00:00:00.000Z)
+ * (2008-02-06T02:06:02.000Z,2008-02-01T00:00:00.000Z)
+ * (2007-03-05T03:05:03.000Z,2008-02-01T00:00:00.000Z)
+ * ...
+ *
+ * diffs = FOREACH ISOin GENERATE YearsBetween(dt, dt2) AS years,
+ * MonthsBetween(dt, dt2) AS months,
+ * WeeksBetween(dt, dt2) AS weeks,
+ * DaysBetween(dt, dt2) AS days,
+ * HoursBetween(dt, dt2) AS hours,
+ * MinutesBetween(dt, dt2) AS mins,
+ * SecondsBetween(dt, dt2) AS secs;
+ * MilliSecondsBetween(dt, dt2) AS millis;
+ *
+ * DESCRIBE diffs;
+ * diffs: {years: long,months: long,weeks: long,days: long,hours: long,mins: long,secs: long,millis: long}
+ *
+ * DUMP diffs;
+ *
+ * (0L,11L,48L,341L,8185L,491107L,29466421L,29466421000L)
+ * (0L,0L,0L,5L,122L,7326L,439562L,439562000L)
+ * (0L,-10L,-47L,-332L,-7988L,-479334L,-28760097L,-28760097000L)
+ *
+ * </pre>
+ */
+public class WeeksBetween extends EvalFunc<Long> {
+
+    @Override
+    public Long exec(Tuple input) throws IOException
+    {
+        if (input == null || input.size() < 2) {
+            return null;
+        }
+
+        DateTime startDate = (DateTime) input.get(0);
+        DateTime endDate = (DateTime) input.get(1);
+
+        // Larger date first
+        // Subtraction may overflow
+        return (startDate.getMillis() - endDate.getMillis()) / 604800000L;
+
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.LONG));
+    }
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        funcList.add(new FuncSpec(this.getClass().getName(), s));
+        return funcList;
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/builtin/YearsBetween.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/YearsBetween.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/YearsBetween.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/YearsBetween.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.Years;
+
+/**
+ * <p>YearsBetween returns the number of years between two DateTime objects</p>
+ *
+ * <ul>
+ * <li>Jodatime: http://joda-time.sourceforge.net/</li>
+ * <li>ISO8601 Date Format: http://en.wikipedia.org/wiki/ISO_8601</li>
+ * </ul>
+ * <br />
+ * <pre>
+ * Example usage:
+ *
+ * ISOin = LOAD 'test.tsv' USING PigStorage('\t') AS (datetime, dt2:datetime);
+ *
+ * DESCRIBE ISOin;
+ * ISOin: {dt: datetime,dt2: datetime}
+ *
+ * DUMP ISOin;
+ *
+ * (2009-01-07T01:07:01.000Z,2008-02-01T00:00:00.000Z)
+ * (2008-02-06T02:06:02.000Z,2008-02-01T00:00:00.000Z)
+ * (2007-03-05T03:05:03.000Z,2008-02-01T00:00:00.000Z)
+ * ...
+ *
+ * diffs = FOREACH ISOin GENERATE YearsBetween(dt, dt2) AS years,
+ * MonthsBetween(dt, dt2) AS months,
+ * WeeksBetween(dt, dt2) AS weeks,
+ * DaysBetween(dt, dt2) AS days,
+ * HoursBetween(dt, dt2) AS hours,
+ * MinutesBetween(dt, dt2) AS mins,
+ * SecondsBetween(dt, dt2) AS secs;
+ * MilliSecondsBetween(dt, dt2) AS millis;
+ *
+ * DESCRIBE diffs;
+ * diffs: {years: long,months: long,weeks: long,days: long,hours: long,mins: long,secs: long,millis: long}
+ *
+ * DUMP diffs;
+ *
+ * (0L,11L,48L,341L,8185L,491107L,29466421L,29466421000L)
+ * (0L,0L,0L,5L,122L,7326L,439562L,439562000L)
+ * (0L,-10L,-47L,-332L,-7988L,-479334L,-28760097L,-28760097000L)
+ *
+ * </pre>
+ */
+public class YearsBetween extends EvalFunc<Long> {
+
+    @Override
+    public Long exec(Tuple input) throws IOException
+    {
+        if (input == null || input.size() < 2) {
+            return null;
+        }
+
+        DateTime startDate = (DateTime) input.get(0);
+        DateTime endDate = (DateTime) input.get(1);
+
+        // Larger value first
+        Years y = Years.yearsBetween(endDate, startDate);
+        // joda limitation, only integer range, at the risk of overflow, need to be improved
+        return (long) y.getYears();
+
+    }
+
+	@Override
+	public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.LONG));
+	}
+
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        s.add(new Schema.FieldSchema(null, DataType.DATETIME));
+        funcList.add(new FuncSpec(this.getClass().getName(), s));
+        return funcList;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Fri Aug 24 00:18:05 2012
@@ -28,6 +28,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -51,6 +54,8 @@ import org.apache.pig.impl.util.ObjectSe
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 public class BinInterSedes implements InterSedes {
+    
+    private static final int ONE_MINUTE = 60000;
 
     public static final byte BOOLEAN_TRUE = 0;
     public static final byte BOOLEAN_FALSE = 1;
@@ -103,6 +108,8 @@ public class BinInterSedes implements In
     public static final byte LONG_0 = 34;
     public static final byte LONG_1 = 35;
 
+    public static final byte DATETIME = 50;
+
     public static final byte TUPLE_0 = 36;
     public static final byte TUPLE_1 = 37;
     public static final byte TUPLE_2 = 38;
@@ -375,6 +382,9 @@ public class BinInterSedes implements In
         case LONG:
             return Long.valueOf(in.readLong());
 
+        case DATETIME:
+            return new DateTime(in.readLong(), DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
+
         case FLOAT:
             return Float.valueOf(in.readFloat());
 
@@ -496,6 +506,12 @@ public class BinInterSedes implements In
             }
             break;
 
+        case DataType.DATETIME:
+            out.writeByte(DATETIME);
+            out.writeLong(((DateTime) val).getMillis());
+            out.writeShort(((DateTime) val).getZone().getOffset((DateTime) val) / ONE_MINUTE);
+            break;
+            
         case DataType.FLOAT:
             out.writeByte(FLOAT);
             out.writeFloat((Float) val);
@@ -809,6 +825,18 @@ public class BinInterSedes implements In
                 }
                 break;
             }
+            case BinInterSedes.DATETIME: {
+                type1 = DataType.DATETIME;
+                type2 = getGeneralizedDataType(dt2);
+                if (type1 == type2) {
+                    long lv1 = bb1.getLong();
+                    bb1.position(bb1.position() + 2); // move cursor forward without read the timezone bytes
+                    long lv2 = bb2.getLong();
+                    bb2.position(bb2.position() + 2);
+                    rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
+                }
+                break;
+            }
             case BinInterSedes.FLOAT: {
                 type1 = DataType.FLOAT;
                 type2 = getGeneralizedDataType(dt2);
@@ -1108,6 +1136,8 @@ public class BinInterSedes implements In
             case BinInterSedes.LONG_ININT:
             case BinInterSedes.LONG:
                 return DataType.LONG;
+            case BinInterSedes.DATETIME:
+                return DataType.DATETIME;
             case BinInterSedes.FLOAT:
                 return DataType.FLOAT;
             case BinInterSedes.DOUBLE:

Modified: pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Fri Aug 24 00:18:05 2012
@@ -25,6 +25,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,6 +50,7 @@ public class DataReaderWriter {
     private static BagFactory mBagFactory = BagFactory.getInstance();
     static final int UNSIGNED_SHORT_MAX = 65535;
     public static final String UTF8 = "UTF-8";
+    private static final int ONE_MINUTE = 60000;
 
     public static Tuple bytesToTuple(DataInput in) throws IOException {
         // Don't use Tuple.readFields, because it requires you to
@@ -178,6 +182,9 @@ public class DataReaderWriter {
 
             case DataType.BYTE:
                 return Byte.valueOf(in.readByte());
+                
+            case DataType.DATETIME:
+                return new DateTime(in.readLong(), DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
 
             case DataType.BYTEARRAY: {
                 int size = in.readInt();
@@ -290,6 +297,11 @@ public class DataReaderWriter {
                 out.writeByte((Byte)val);
                 break;
 
+            case DataType.DATETIME:
+                out.writeByte(DataType.DATETIME);
+                out.writeLong(((DateTime)val).getMillis());
+                out.writeShort(((DateTime)val).getZone().getOffset((DateTime)val) / 60000);
+
             case DataType.BYTEARRAY: {
                 out.writeByte(DataType.BYTEARRAY);
                 DataByteArray bytes = (DataByteArray)val;

Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Fri Aug 24 00:18:05 2012
@@ -26,6 +26,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
 import org.apache.hadoop.io.WritableComparable;
 
 import org.apache.pig.classification.InterfaceAudience;
@@ -33,6 +36,7 @@ import org.apache.pig.classification.Int
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.ToDate;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
@@ -63,6 +67,7 @@ public class DataType {
     public static final byte LONG      =  15;
     public static final byte FLOAT     =  20;
     public static final byte DOUBLE    =  25;
+    public static final byte DATETIME  =  30;
     public static final byte BYTEARRAY =  50;
     public static final byte CHARARRAY =  55;
     /**
@@ -119,6 +124,8 @@ public class DataType {
             return DOUBLE;
         } else if (o instanceof Boolean) {
             return BOOLEAN;
+        } else if (o instanceof DateTime) {
+            return DATETIME;
         } else if (o instanceof Byte) {
             return BYTE;
         } else if (o instanceof WritableComparable) {
@@ -154,6 +161,8 @@ public class DataType {
             return BOOLEAN;
         } else if (t == Byte.class) {
             return BYTE;
+        } else if (t == DateTime.class) {
+            return DATETIME;
         } else if (t == InternalMap.class) {
             return INTERNALMAP;
         } else {
@@ -218,7 +227,7 @@ public class DataType {
      */
     public static byte[] genAllTypes(){
         byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY,
-                DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT,
+                DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT, DataType.DATETIME,
                 DataType.GENERIC_WRITABLECOMPARABLE,
                 DataType.INTEGER, DataType.INTERNALMAP,
                 DataType.LONG, DataType.MAP, DataType.TUPLE};
@@ -227,7 +236,7 @@ public class DataType {
 
     private static String[] genAllTypeNames(){
         String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY",
-                "CHARARRAY", "DOUBLE", "FLOAT",
+                "CHARARRAY", "DOUBLE", "FLOAT", "DATETIME",
                 "GENERIC_WRITABLECOMPARABLE",
                 "INTEGER","INTERNALMAP",
                 "LONG", "MAP", "TUPLE" };
@@ -285,6 +294,7 @@ public class DataType {
         case LONG:      return "long";
         case FLOAT:     return "float";
         case DOUBLE:    return "double";
+        case DATETIME:  return "datetime";
         case BYTEARRAY: return "bytearray";
         case BIGCHARARRAY: return "bigchararray";
         case CHARARRAY: return "chararray";
@@ -332,6 +342,7 @@ public class DataType {
                 (dataType == DOUBLE) ||
                 (dataType == BOOLEAN) ||
                 (dataType == BYTE) ||
+                (dataType == DATETIME) ||
                 (dataType == GENERIC_WRITABLECOMPARABLE));
     }
 
@@ -369,7 +380,7 @@ public class DataType {
      * because there's no super class that implements compareTo.  This
      * function provides an (arbitrary) ordering of objects of different
      * types as follows:  NULL &lt; BOOLEAN &lt; BYTE &lt; INTEGER &lt; LONG &lt;
-     * FLOAT &lt; DOUBLE * &lt; BYTEARRAY &lt; STRING &lt; MAP &lt;
+     * FLOAT &lt; DOUBLE &lt; DATETIME &lt; BYTEARRAY &lt; STRING &lt; MAP &lt;
      * TUPLE &lt; BAG.  No other functions should implement this cross
      * object logic.  They should call this function for it instead.
      * @param o1 First object
@@ -420,6 +431,9 @@ public class DataType {
             case DOUBLE:
                 return ((Double)o1).compareTo((Double)o2);
 
+            case DATETIME:
+                return ((DateTime)o1).compareTo((DateTime)o2);
+
             case BYTEARRAY:
                 return ((DataByteArray)o1).compareTo(o2);
 
@@ -504,6 +518,9 @@ public class DataType {
         case LONG:
             return ((Number) o).toString().getBytes();
 
+        case DATETIME:
+            return ((DateTime) o).toString().getBytes();
+
         case CHARARRAY:
             return ((String) o).getBytes();
         case MAP:
@@ -574,6 +591,7 @@ public class DataType {
                 } else {
                     return null;
                 }
+            case DATETIME:
             case MAP:
             case INTERNALMAP:
             case TUPLE:
@@ -649,6 +667,9 @@ public class DataType {
 			case NULL:
 			    return null;
 
+			case DATETIME:
+			    return Integer.valueOf(Long.valueOf(((DateTime)o).getMillis()).intValue());
+
 			case MAP:
 			case INTERNALMAP:
 			case TUPLE:
@@ -738,6 +759,8 @@ public class DataType {
 			case NULL:
 			    return null;
 
+			case DATETIME:
+			    return Long.valueOf(((DateTime)o).getMillis());
 			case MAP:
 			case INTERNALMAP:
 			case TUPLE:
@@ -812,6 +835,9 @@ public class DataType {
 			case DOUBLE:
 			    return new Float(((Double)o).floatValue());
 
+	         case DATETIME:
+	             return new Float(Long.valueOf(((DateTime)o).getMillis()).floatValue());
+
 			case BYTEARRAY:
 			    return Float.valueOf(((DataByteArray)o).toString());
 
@@ -895,7 +921,10 @@ public class DataType {
 			case DOUBLE:
 			    return (Double)o;
 
-			case BYTEARRAY:
+            case DATETIME:
+                return new Double(Long.valueOf(((DateTime)o).getMillis()).doubleValue());
+
+            case BYTEARRAY:
 			    return Double.valueOf(((DataByteArray)o).toString());
 
 			case CHARARRAY:
@@ -930,6 +959,78 @@ public class DataType {
 			throw new ExecException(msg, errCode, PigException.BUG);
 		}
     }
+    
+    /**
+     * Force a data object to a DateTime, if possible. Only CharArray, ByteArray
+     * can be forced to a DateTime. Numeric types and complex types
+     * cannot be forced to a DateTime. This isn't particularly efficient, so if
+     * you already <b>know</b> that the object you have is a DateTime you should
+     * just cast it.
+     * 
+     * @param o
+     *            object to cast
+     * @param type
+     *            of the object you are casting
+     * @return The object as a Boolean.
+     * @throws ExecException
+     *             if the type can't be forced to a Boolean.
+     */
+    public static DateTime toDateTime(Object o, byte type) throws ExecException {
+        try {
+            switch (type) {
+            case NULL:
+                return null;
+            case BYTEARRAY:
+                return new DateTime(((DataByteArray) o).toString());
+            case CHARARRAY:
+                // the string can contain just date part or date part plus time part
+                DateTimeZone dtz = ToDate.extractDateTimeZone((String) o);
+                if (dtz == null) {
+                    return new DateTime((String) o);
+                } else {
+                    return new DateTime((String) o, dtz);
+                }
+            case INTEGER:
+                return new DateTime(((Integer) o).longValue());
+            case LONG:
+                return new DateTime(((Long) o).longValue());
+            case FLOAT:
+                return new DateTime(((Float) o).longValue());
+            case DOUBLE:
+                return new DateTime(((Double) o).longValue());
+            case DATETIME:
+                return (DateTime) o;
+
+            case BOOLEAN:
+            case BYTE:
+            case MAP:
+            case INTERNALMAP:
+            case TUPLE:
+            case BAG:
+            case UNKNOWN:
+            default:
+                int errCode = 1071;
+                String msg = "Cannot convert a " + findTypeName(o) + " to a Boolean";
+                throw new ExecException(msg, errCode, PigException.INPUT);
+            }
+        } catch (ClassCastException cce) {
+            throw cce;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (NumberFormatException nfe) {
+            int errCode = 1074;
+            String msg = "Problem with formatting. Could not convert " + o + " to Float.";
+            throw new ExecException(msg, errCode, PigException.INPUT, nfe);
+        } catch (Exception e) {
+            int errCode = 2054;
+            String msg = "Internal error. Could not convert " + o + " to Float.";
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+    
+    public static DateTime toDateTime(Object o) throws ExecException {
+        return toDateTime(o, findType(o));
+    }
 
     /**
      * Force a data object to a Double, if possible.  Any numeric type
@@ -973,6 +1074,9 @@ public class DataType {
 
 			case DOUBLE:
 			    return ((Double)o).toString();
+    
+			case DATETIME:
+			    return ((DateTime)o).toString();
 
 			case BYTEARRAY:
 			    return ((DataByteArray)o).toString();
@@ -1315,6 +1419,7 @@ public class DataType {
         case LONG:
         case FLOAT:
         case DOUBLE:
+        case DATETIME:
         case BYTEARRAY:
         case CHARARRAY:
         case MAP:

Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Fri Aug 24 00:18:05 2012
@@ -323,6 +323,13 @@ public class DefaultTuple extends Abstra
                             double dv2 = bb2.getDouble();
                             rc = Double.compare(dv1, dv2);
                             break;
+                        case DataType.DATETIME:
+                            long dtv1 = bb1.getLong();
+                            bb1.position(bb1.position() + 2); // move cursor forward without read the timezone bytes
+                            long dtv2 = bb2.getLong();
+                            bb2.position(bb2.position() + 2);
+                            rc = (dtv1 < dtv2 ? -1 : (dtv1 == dtv2 ? 0 : 1));
+                            break;
                         case DataType.BYTEARRAY:
                             int basz1 = bb1.getInt();
                             int basz2 = bb2.getInt();

Modified: pig/trunk/src/org/apache/pig/data/SchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTuple.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTuple.java Fri Aug 24 00:18:05 2012
@@ -24,6 +24,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.classification.InterfaceAudience;
@@ -51,6 +54,7 @@ import com.google.common.collect.Lists;
 public abstract class SchemaTuple<T extends SchemaTuple<T>> extends AbstractTuple implements TypeAwareTuple {
 
     private static final long serialVersionUID = 1L;
+    private static final int ONE_MINUTE = 60000;
     private static final BinInterSedes bis = new BinInterSedes();
 
     @NotImplemented
@@ -173,6 +177,11 @@ public abstract class SchemaTuple<T exte
         out.writeDouble(v);
     }
 
+    protected static void write(DataOutput out, DateTime v) throws IOException {
+        out.writeLong(v.getMillis());
+        out.writeShort(v.getZone().getOffset(v) / ONE_MINUTE);
+    }
+
     protected static void write(DataOutput out, byte[] v) throws IOException {
         SedesHelper.writeBytes(out, v);
     }
@@ -210,6 +219,10 @@ public abstract class SchemaTuple<T exte
         return in.readDouble();
     }
 
+    protected static DateTime read(DataInput in, DateTime v) throws IOException {
+        return new DateTime(in.readLong(), DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
+    }
+
     protected static String read(DataInput in, String v) throws IOException {
         return SedesHelper.readChararray(in, in.readByte());
     }
@@ -373,6 +386,10 @@ public abstract class SchemaTuple<T exte
         return unbox((Boolean)v);
     }
 
+    protected DateTime unbox(Object v, DateTime t) {
+        return (DateTime)v;
+    }
+
     protected String unbox(Object v, String t) {
         return (String)v;
     }
@@ -416,6 +433,10 @@ public abstract class SchemaTuple<T exte
         return v.booleanValue();
     }
 
+    protected DateTime unbox(DateTime v) {
+        return v;
+    }
+
     protected DataBag box(DataBag v) {
         return v;
     }
@@ -459,6 +480,10 @@ public abstract class SchemaTuple<T exte
         return new Boolean(v);
     }
 
+    protected DateTime box(DateTime v) {
+        return v;
+    }
+
     protected int hashCodePiece(int hash, int v, boolean isNull) {
         return isNull ? hash : 31 * hash + v;
     }
@@ -480,6 +505,10 @@ public abstract class SchemaTuple<T exte
         return isNull ? hash : 31 * hash + (v ? 1231 : 1237);
     }
 
+    protected int hashCodePiece(int hash, DateTime v, boolean isNull) {
+        return isNull ? hash : 31 * hash + v.hashCode();
+    }
+
     protected int hashCodePiece(int hash, byte[] v, boolean isNull) {
         return isNull ? hash : 31 * hash + DataByteArray.hashCode(v);
     }
@@ -579,6 +608,13 @@ public abstract class SchemaTuple<T exte
     protected abstract void generatedCodeSetBoolean(int fieldNum, boolean val) throws ExecException;
 
     @Override
+    public void setDateTime(int fieldNum, DateTime val) throws ExecException {
+         generatedCodeSetDateTime(fieldNum, val);
+    }
+
+    protected abstract void generatedCodeSetDateTime(int fieldNum, DateTime val) throws ExecException;
+
+    @Override
     public void setString(int fieldNum, String val) throws ExecException {
          generatedCodeSetString(fieldNum, val);
     }
@@ -644,6 +680,11 @@ public abstract class SchemaTuple<T exte
         return val;
     }
 
+    protected DateTime returnUnlessNull(boolean isNull, DateTime val) throws FieldIsNullException {
+        errorIfNull(isNull, "DateTime");
+        return val;
+    }
+
     protected Tuple returnUnlessNull(boolean isNull, Tuple val) throws FieldIsNullException {
         errorIfNull(isNull, "Tuple");
         return val;
@@ -722,6 +763,17 @@ public abstract class SchemaTuple<T exte
 
     public boolean unboxBoolean(Object val) {
         return ((Boolean)val).booleanValue();
+    }    
+
+    @Override
+    public DateTime getDateTime(int fieldNum) throws ExecException {
+        return generatedCodeGetDateTime(fieldNum);
+    }
+
+    protected abstract DateTime generatedCodeGetDateTime(int fieldNum) throws ExecException;
+
+    public DateTime unboxDateTime(Object val) {
+        return (DateTime)val;
     }
 
     @Override
@@ -1027,6 +1079,33 @@ public abstract class SchemaTuple<T exte
         return compare(isNull, val, themNull, themVal);
     }
 
+    protected int compare(boolean usNull, DateTime usVal, boolean themNull, DateTime themVal) {
+        if (usNull && themNull) {
+            return 0;
+        } else if (themNull) {
+            return 1;
+        } else if (usNull) {
+            return -1;
+        }
+        return compare(usVal, themVal);
+    }
+
+    protected int compare(DateTime val, DateTime themVal) {
+        return val.compareTo(themVal);
+    }
+
+    protected int compareWithElementAtPos(boolean isNull, DateTime val, SchemaTuple<?> t, int pos) {
+        DateTime themVal;
+        boolean themNull;
+        try {
+            themVal = t.getDateTime(pos);
+            themNull = t.isNull(pos);
+        } catch (ExecException e) {
+            throw new RuntimeException("Unable to retrieve String field " + pos + " in given Tuple: " + t, e);
+        }
+        return compare(isNull, val, themNull, themVal);
+    }
+
     protected int compare(boolean usNull, String usVal, boolean themNull, String themVal) {
         if (usNull && themNull) {
             return 0;
@@ -1158,4 +1237,4 @@ public abstract class SchemaTuple<T exte
     protected abstract void generatedCodeReadFields(DataInput in, boolean[] nulls) throws IOException;
 
     protected abstract boolean[] generatedCodeNullsArray() throws IOException;
-}
\ No newline at end of file
+}

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java Fri Aug 24 00:18:05 2012
@@ -729,6 +729,8 @@ public class SchemaTupleClassGenerator {
                 if (booleans++ % 8 == 0) {
                     size++; //accounts for the byte used to store boolean values
                 }
+            } else if (isDateTime()) {
+                size += 10; // 8 for long and 2 for short
             } else if (isBag()) {
                 size += 8; //the ptr
                 s += "(pos_"+fieldPos+" == null ? 0 : pos_"+fieldPos+".getMemorySize()) + ";
@@ -764,6 +766,7 @@ public class SchemaTupleClassGenerator {
             case (DataType.FLOAT): add("    return 0.0f;"); break;
             case (DataType.DOUBLE): add("    return 0.0;"); break;
             case (DataType.BOOLEAN): add("    return true;"); break;
+            case (DataType.DATETIME): add("    return new DateTime();"); break;
             case (DataType.BYTEARRAY): add("    return (byte[])null;"); break;
             case (DataType.CHARARRAY): add("    return (String)null;"); break;
             case (DataType.TUPLE): add("    return (Tuple)null;"); break;
@@ -1032,6 +1035,7 @@ public class SchemaTupleClassGenerator {
             listOfFutureMethods.add(new TypeAwareSetString(DataType.BYTEARRAY));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.CHARARRAY));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.BOOLEAN));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.DATETIME));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.TUPLE));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.BAG));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.MAP));
@@ -1042,6 +1046,7 @@ public class SchemaTupleClassGenerator {
             listOfFutureMethods.add(new TypeAwareGetString(DataType.BYTEARRAY));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.CHARARRAY));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.BOOLEAN));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.DATETIME));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.TUPLE));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.BAG));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.MAP));
@@ -1069,6 +1074,8 @@ public class SchemaTupleClassGenerator {
                     .append("\n")
                     .append("import com.google.common.collect.Lists;\n")
                     .append("\n")
+                    .append("import org.joda.time.DateTime;")
+                    .append("\n")
                     .append("import org.apache.pig.data.DataType;\n")
                     .append("import org.apache.pig.data.DataBag;\n")
                     .append("import org.apache.pig.data.Tuple;\n")
@@ -1187,6 +1194,10 @@ public class SchemaTupleClassGenerator {
             return type == DataType.DOUBLE;
         }
 
+        public boolean isDateTime() {
+            return type == DataType.DATETIME;
+        }
+
         public boolean isPrimitive() {
             return isInt() || isLong() || isFloat() || isDouble() || isBoolean();
         }
@@ -1232,6 +1243,7 @@ public class SchemaTupleClassGenerator {
                 case (DataType.BYTEARRAY): return "byte[]";
                 case (DataType.CHARARRAY): return "String";
                 case (DataType.BOOLEAN): return "boolean";
+                case (DataType.DATETIME): return "DateTime";
                 case (DataType.TUPLE): return "Tuple";
                 case (DataType.BAG): return "DataBag";
                 case (DataType.MAP): return "Map";
@@ -1247,4 +1259,4 @@ public class SchemaTupleClassGenerator {
             }
         }
     }
-}
\ No newline at end of file
+}

Modified: pig/trunk/src/org/apache/pig/data/SizeUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SizeUtil.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SizeUtil.java (original)
+++ pig/trunk/src/org/apache/pig/data/SizeUtil.java Fri Aug 24 00:18:05 2012
@@ -67,6 +67,9 @@ public class SizeUtil {
         case DataType.LONG:
             return 8 + 8;
 
+        case DataType.DATETIME:
+            return 8 + 2 + 8 + 6 /* one long (8) + one short (2) + 6 to round to 8 bytes */;
+
         case DataType.MAP: {
             @SuppressWarnings("unchecked")
             Map<String, Object> m = (Map<String, Object>) o;

Modified: pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java Fri Aug 24 00:18:05 2012
@@ -19,6 +19,8 @@ package org.apache.pig.data;
 
 import java.util.Map;
 
+import org.joda.time.DateTime;
+
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -34,6 +36,7 @@ public interface TypeAwareTuple extends 
     public void setTuple(int idx, Tuple val) throws ExecException;
     public void setDataBag(int idx, DataBag val) throws ExecException;
     public void setMap(int idx, Map<String,Object> val) throws ExecException;
+    public void setDateTime(int idx, DateTime val) throws ExecException;
 
     public int getInt(int idx) throws ExecException, FieldIsNullException;
     public float getFloat(int idx) throws ExecException, FieldIsNullException;
@@ -45,6 +48,7 @@ public interface TypeAwareTuple extends 
     public Tuple getTuple(int idx) throws ExecException;
     public DataBag getDataBag(int idx) throws ExecException, FieldIsNullException;
     public Map<String,Object> getMap(int idx) throws ExecException, FieldIsNullException;
+    public DateTime getDateTime(int idx) throws ExecException, FieldIsNullException;
 
     public Schema getSchema();
 

Added: pig/trunk/src/org/apache/pig/impl/io/NullableDateTimeWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableDateTimeWritable.java?rev=1376800&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableDateTimeWritable.java (added)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableDateTimeWritable.java Fri Aug 24 00:18:05 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.io;
+
+import org.joda.time.DateTime;
+
+import org.apache.pig.backend.hadoop.DateTimeWritable; 
+
+public class NullableDateTimeWritable extends PigNullableWritable {
+
+    public NullableDateTimeWritable() {
+        mValue = new DateTimeWritable();
+    }
+
+    public NullableDateTimeWritable(DateTime dt) {
+        mValue = new DateTimeWritable(dt);
+    }
+
+    public Object getValueAsPigType() {
+        return isNull() ? null : ((DateTimeWritable)mValue).get();
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Fri Aug 24 00:18:05 2012
@@ -256,11 +256,16 @@ public class Schema implements Serializa
                 }
                 else if (DataType.isNumberType(inputType) && (castType == DataType.CHARARRAY
                         || castType == DataType.BYTEARRAY || DataType.isNumberType(castType)
-                        || castType == DataType.BOOLEAN)) {
+                        || castType == DataType.BOOLEAN || castType == DataType.DATETIME)) {
+                    // good
+                }
+                else if (inputType == DataType.DATETIME && (castType == DataType.CHARARRAY
+                        || castType == DataType.BYTEARRAY || DataType.isNumberType(castType))) {
                     // good
                 }
                 else if (inputType == DataType.CHARARRAY && (castType == DataType.BYTEARRAY
-                        || DataType.isNumberType(castType) || castType == DataType.BOOLEAN)) {
+                        || DataType.isNumberType(castType) || castType == DataType.BOOLEAN
+                        || castType == DataType.DATETIME)) {
                     // good
                 } 
                 else if (inputType == DataType.BYTEARRAY) {

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java Fri Aug 24 00:18:05 2012
@@ -50,6 +50,7 @@ public class SchemaUtil {
         SUPPORTED_TYPE_SET.add(DataType.BYTEARRAY);
         SUPPORTED_TYPE_SET.add(DataType.DOUBLE);
         SUPPORTED_TYPE_SET.add(DataType.FLOAT);
+        SUPPORTED_TYPE_SET.add(DataType.DATETIME);
         SUPPORTED_TYPE_SET.add(DataType.MAP);
     }
 

Modified: pig/trunk/src/org/apache/pig/impl/util/CastUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/CastUtils.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/CastUtils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/CastUtils.java Fri Aug 24 00:18:05 2012
@@ -56,6 +56,7 @@ public class CastUtils {
 	    case (DataType.INTEGER): return caster.bytesToInteger(bytes);
 	    case (DataType.LONG): return caster.bytesToLong(bytes);
 	    case (DataType.BOOLEAN): return caster.bytesToBoolean(bytes);
+	    case (DataType.DATETIME): return caster.bytesToDateTime(bytes);
 	    case (DataType.MAP): return caster.bytesToMap(bytes);
 	    case (DataType.TUPLE): return caster.bytesToTuple(bytes, fieldSchema);
 	    default: throw new IOException("Unknown type " + dataType);

Modified: pig/trunk/src/org/apache/pig/impl/util/NumValCarrier.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/NumValCarrier.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/NumValCarrier.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/NumValCarrier.java Fri Aug 24 00:18:05 2012
@@ -41,6 +41,7 @@ public class NumValCarrier {
         byteToStr.put(DataType.INTEGER,valCarrier);
         byteToStr.put(DataType.LONG,valCarrier);
         byteToStr.put(DataType.BOOLEAN,valCarrier);
+        byteToStr.put(DataType.DATETIME,valCarrier);
         byteToStr.put(DataType.MAP,mapCarrier);
         byteToStr.put(DataType.TUPLE,tupleCarrier);
         byteToStr.put(DataType.NULL,nullCarrier);

Modified: pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/StorageUtil.java Fri Aug 24 00:18:05 2012
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+
 import org.apache.hadoop.io.Text;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -135,6 +137,10 @@ public final class StorageUtil {
             out.write(((Double)field).toString().getBytes());
             break;
 
+        case DataType.DATETIME:
+            out.write(((DateTime)field).toString().getBytes());
+            break;
+
         case DataType.BYTEARRAY: 
             byte[] b = ((DataByteArray)field).get();
             out.write(b, 0, b.length);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Fri Aug 24 00:18:05 2012
@@ -353,6 +353,10 @@ public class TypeCheckingExpVisitor exte
                 insertCast(binOp, biggerType, binOp.getRhs());
             }
         }
+        else if ( (lhsType == DataType.DATETIME) &&
+                (rhsType == DataType.DATETIME) ) {
+            // good
+        }
         else if ( (lhsType == DataType.CHARARRAY) &&
                 (rhsType == DataType.CHARARRAY) ) {
             // good
@@ -362,13 +366,13 @@ public class TypeCheckingExpVisitor exte
             // good
         }
         else if ( (lhsType == DataType.BYTEARRAY) &&
-                ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) || (rhsType == DataType.BOOLEAN))
+                ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) || (rhsType == DataType.BOOLEAN) || (rhsType == DataType.DATETIME))
         ) {
             // Cast byte array to the type on rhs
             insertCast(binOp, rhsType, binOp.getLhs());
         }
         else if ( (rhsType == DataType.BYTEARRAY) &&
-                ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) || (lhsType == DataType.BOOLEAN))
+                ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) || (lhsType == DataType.BOOLEAN) || (lhsType == DataType.DATETIME))
         ) {
             // Cast byte array to the type on lhs
             insertCast(binOp, lhsType, binOp.getRhs());
@@ -559,12 +563,12 @@ public class TypeCheckingExpVisitor exte
         } 
         else if ((lhsType == DataType.BYTEARRAY)
                 && ((rhsType == DataType.CHARARRAY) || (DataType
-                        .isNumberType(rhsType)))) {
+                        .isNumberType(rhsType))) || (rhsType == DataType.DATETIME)) { // need to add boolean as well
             // Cast byte array to the type on rhs
             insertCast(binCond, rhsType, binCond.getLhs());
         } else if ((rhsType == DataType.BYTEARRAY)
                 && ((lhsType == DataType.CHARARRAY) || (DataType
-                        .isNumberType(lhsType)))) {
+                        .isNumberType(lhsType)) || (rhsType == DataType.DATETIME))) { // need to add boolean as well
             // Cast byte array to the type on lhs
             insertCast(binCond, lhsType, binCond.getRhs());
         }
@@ -1197,7 +1201,7 @@ public class TypeCheckingExpVisitor exte
         //Ordering here decides the score for the best fit function.
         //Do not change the order. Conversions to a smaller type is preferred
         //over conversion to a bigger type where ordering of types is:
-        //INTEGER, LONG, FLOAT, DOUBLE, CHARARRAY, TUPLE, BAG, MAP
+        //INTEGER, LONG, FLOAT, DOUBLE, DATETIME, CHARARRAY, TUPLE, BAG, MAP
         //from small to big
         
         List<Byte> boolToTypes = Arrays.asList(
@@ -1233,6 +1237,7 @@ public class TypeCheckingExpVisitor exte
                 DataType.LONG,
                 DataType.FLOAT,
                 DataType.DOUBLE,
+                DataType.DATETIME,
                 DataType.CHARARRAY,
                 DataType.TUPLE,
                 DataType.BAG,
@@ -1353,6 +1358,9 @@ public class TypeCheckingExpVisitor exte
         case DataType.BOOLEAN:
             kind = PigWarning.IMPLICIT_CAST_TO_BOOLEAN;
             break;
+        case DataType.DATETIME:
+            kind = PigWarning.IMPLICIT_CAST_TO_DATETIME;
+            break;
         case DataType.MAP:
             kind = PigWarning.IMPLICIT_CAST_TO_MAP;
             break;

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Fri Aug 24 00:18:05 2012
@@ -210,7 +210,7 @@ type : simple_type | tuple_type | bag_ty
 ;
 
 simple_type 
-    : BOOLEAN | INT | LONG | FLOAT | DOUBLE | CHARARRAY | BYTEARRAY 
+    : BOOLEAN | INT | LONG | FLOAT | DOUBLE | DATETIME | CHARARRAY | BYTEARRAY 
 ;
 
 tuple_type 
@@ -636,6 +636,7 @@ eid : rel_str_op
     | LONG
     | FLOAT
     | DOUBLE
+    | DATETIME
     | CHARARRAY
     | BYTEARRAY
     | BAG

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Fri Aug 24 00:18:05 2012
@@ -176,6 +176,7 @@ simple_type 
     | LONG { sb.append($LONG.text); }
     | FLOAT { sb.append($FLOAT.text); }
     | DOUBLE { sb.append($DOUBLE.text); }
+    | DATETIME { sb.append($DATETIME.text); }
     | CHARARRAY { sb.append($CHARARRAY.text); }
     | BYTEARRAY { sb.append($BYTEARRAY.text); }
 ;
@@ -618,6 +619,7 @@ eid : rel_str_op
     | LONG      { sb.append($LONG.text); }
     | FLOAT     { sb.append($FLOAT.text); }
     | DOUBLE    { sb.append($DOUBLE.text); }
+    | DATETIME  { sb.append($DATETIME.text); }
     | CHARARRAY { sb.append($CHARARRAY.text); }
     | BYTEARRAY { sb.append($BYTEARRAY.text); }
     | BAG       { sb.append($BAG.text); }

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Fri Aug 24 00:18:05 2012
@@ -230,6 +230,7 @@ simple_type returns [byte typev]
   | LONG { $typev = DataType.LONG; }
   | FLOAT { $typev = DataType.FLOAT; }
   | DOUBLE { $typev = DataType.DOUBLE; }
+  | DATETIME { $typev = DataType.DATETIME; }
   | CHARARRAY { $typev = DataType.CHARARRAY; }
   | BYTEARRAY { $typev = DataType.BYTEARRAY; }
 ;
@@ -628,6 +629,7 @@ eid : rel_str_op
     | LONG
     | FLOAT
     | DOUBLE
+    | DATETIME
     | CHARARRAY
     | BYTEARRAY
     | BAG

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Fri Aug 24 00:18:05 2012
@@ -386,6 +386,7 @@ simple_type returns[byte datatype]
  | LONG { $datatype = DataType.LONG; }
  | FLOAT { $datatype = DataType.FLOAT; }
  | DOUBLE { $datatype = DataType.DOUBLE; }
+ | DATETIME { $datatype = DataType.DATETIME; }
  | CHARARRAY { $datatype = DataType.CHARARRAY; }
  | BYTEARRAY { $datatype = DataType.BYTEARRAY; }
 ;
@@ -1733,6 +1734,7 @@ eid returns[String id] : rel_str_op { $i
     | LONG { $id = $LONG.text; }
     | FLOAT { $id = $FLOAT.text; }
     | DOUBLE { $id = $DOUBLE.text; }
+    | DATETIME { $id = $DATETIME.text; }
     | CHARARRAY { $id = $CHARARRAY.text; }
     | BYTEARRAY { $id = $BYTEARRAY.text; }
     | BAG { $id = $BAG.text; }

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Fri Aug 24 00:18:05 2012
@@ -180,6 +180,9 @@ FLOAT : 'FLOAT'
 DOUBLE : 'DOUBLE'
 ;
 
+DATETIME : 'DATETIME'
+;
+
 CHARARRAY : 'CHARARRAY'
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Fri Aug 24 00:18:05 2012
@@ -317,7 +317,7 @@ field_def_list : field_def ( COMMA field
 type : simple_type | tuple_type | bag_type | map_type
 ;
 
-simple_type : BOOLEAN | INT | LONG | FLOAT | DOUBLE | CHARARRAY | BYTEARRAY
+simple_type : BOOLEAN | INT | LONG | FLOAT | DOUBLE | DATETIME | CHARARRAY | BYTEARRAY
 ;
 
 tuple_type : TUPLE? LEFT_PAREN field_def_list? RIGHT_PAREN
@@ -391,16 +391,13 @@ or_cond : and_cond  ( OR^ and_cond )*
 and_cond : unary_cond ( AND^ unary_cond )*
 ;
 
-unary_cond : expr rel_op^ expr
-           | LEFT_PAREN! cond RIGHT_PAREN!
-           | not_cond           
+unary_cond : LEFT_PAREN! cond RIGHT_PAREN!
+           | not_cond
+           | expr rel_op^ expr
            | func_eval
            | null_check_cond
-           | bool_cond           
 ;
 
-bool_cond: expr -> ^(BOOL_COND expr);
-
 not_cond : NOT^ unary_cond
 ;
 
@@ -767,6 +764,7 @@ eid : rel_str_op
     | LONG
     | FLOAT
     | DOUBLE
+    | DATETIME
     | CHARARRAY
     | BYTEARRAY
     | BAG
@@ -794,7 +792,6 @@ eid : rel_str_op
     | TRUE
     | FALSE
     | REALIAS
-    | BOOL_COND
 ;
 
 // relational operator

Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Fri Aug 24 00:18:05 2012
@@ -28,6 +28,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.Collection;
 
+import org.joda.time.DateTime;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -1212,6 +1214,19 @@ public class AugmentBaseDataVisitor exte
             return Float.valueOf((Float) v - 1);
         case DataType.DOUBLE:
             return Double.valueOf((Double) v - 1);
+        case DataType.DATETIME:
+            DateTime dt = (DateTime) v;
+            if (dt.getMillisOfSecond() != 0) {
+                return dt.minusMillis(1);
+            } else if (dt.getSecondOfMinute() != 0) {
+                return dt.minusSeconds(1);
+            } else if (dt.getMinuteOfHour() != 0) {
+                return dt.minusMinutes(1);
+            } else if (dt.getHourOfDay() != 0) {
+                return dt.minusHours(1);
+            } else {
+                return dt.minusDays(1);
+            }
         default:
             return null;
         }
@@ -1240,6 +1255,19 @@ public class AugmentBaseDataVisitor exte
             return Float.valueOf((Float) v + 1);
         case DataType.DOUBLE:
             return Double.valueOf((Double) v + 1);
+        case DataType.DATETIME:
+            DateTime dt = (DateTime) v;
+            if (dt.getMillisOfSecond() != 0) {
+                return dt.plusMillis(1);
+            } else if (dt.getSecondOfMinute() != 0) {
+                return dt.plusSeconds(1);
+            } else if (dt.getMinuteOfHour() != 0) {
+                return dt.plusMinutes(1);
+            } else if (dt.getHourOfDay() != 0) {
+                return dt.plusHours(1);
+            } else {
+                return dt.plusDays(1);
+            }
         default:
             return null;
         }
@@ -1265,6 +1293,8 @@ public class AugmentBaseDataVisitor exte
             return Integer.valueOf(data);
         case DataType.LONG:
             return Long.valueOf(data);
+        case DataType.DATETIME:
+            return new DateTime(data);
         case DataType.CHARARRAY:
             return data;
         default:

Modified: pig/trunk/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/jruby/JrubyScriptEngine.java Fri Aug 24 00:18:05 2012
@@ -171,6 +171,7 @@ public class JrubyScriptEngine extends S
                 case DataType.FLOAT: canonicalName += "Float"; break;
                 case DataType.INTEGER: canonicalName += "Integer"; break;
                 case DataType.LONG: canonicalName += "Long"; break;
+                case DataType.DATETIME: canonicalName += "DateTime"; break;
                 case DataType.MAP: canonicalName += "Map"; break;
                 case DataType.BYTEARRAY: canonicalName += "DataByteArray"; break;
                 default: throw new ExecException("Unable to instantiate Algebraic EvalFunc " + method + " as schema type is invalid");

Modified: pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/jruby/RubySchema.java Fri Aug 24 00:18:05 2012
@@ -295,6 +295,22 @@ public class RubySchema extends RubyObje
     }
 
     /**
+     * This is a static helper method to create a null aliased datetime Schema.
+     * This is useful in cases where you do not want the output to have an explicit
+     * name, which {@link Utils#getSchemaFromString} will assign.
+     *
+     * @param context the context the method is being executed in
+     * @param self    an instance of the RubyClass with metadata on
+     *                the Ruby class object this method is being
+     *                statically invoked against
+     * @return        a null-aliased bytearray schema
+     */
+    @JRubyMethod(meta = true, name = {"dt", "datetime"})
+    public static RubySchema nullDateTime(ThreadContext context, IRubyObject self) {
+       return makeNullAliasRubySchema(context, DataType.DATETIME);
+    }
+
+    /**
      * This is a static helper method to create a null aliased tuple Schema.
      * This is useful in cases where you do not want the output to have an explicit
      * name, which {@link Utils#getSchemaFromString} will assign.

Modified: pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java?rev=1376800&r1=1376799&r2=1376800&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java (original)
+++ pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java Fri Aug 24 00:18:05 2012
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+
 import org.apache.pig.LoadCaster;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -161,6 +163,11 @@ public class PigPerformanceLoader extend
         }
 
         @Override
+        public DateTime bytesToDateTime(byte[] arg0) throws IOException {
+            return helper.bytesToDateTime(arg0);
+        }
+
+        @Override
         public Tuple bytesToTuple(byte[] arg0, ResourceFieldSchema fs) throws IOException {
             return helper.bytesToTuple(arg0, fs);
         }