You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/08/08 10:58:31 UTC

[5/5] flink git commit: [FLINK-6429] [table] Bump Calcite version to 1.13.

[FLINK-6429] [table] Bump Calcite version to 1.13.

This closes #4373.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5770fe8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5770fe8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5770fe8

Branch: refs/heads/master
Commit: d5770fe8dd1486d457c87c17a7df8dba276e9bcd
Parents: f471888
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Jul 19 14:34:37 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 8 12:56:34 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |    2 +-
 .../calcite/avatica/util/DateTimeUtils.java     | 1044 ++++
 .../apache/calcite/rel/rules/PushProjector.java |  868 +++
 .../calcite/sql/fun/SqlGroupFunction.java       |  103 -
 .../calcite/sql/fun/SqlStdOperatorTable.java    | 2133 -------
 .../apache/calcite/sql/validate/AggChecker.java |  225 -
 .../sql/validate/SqlUserDefinedAggFunction.java |   82 -
 .../calcite/sql2rel/SqlToRelConverter.java      | 5356 ------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  |    2 +-
 .../flink/table/calcite/FlinkTypeSystem.scala   |    2 +-
 .../table/catalog/ExternalCatalogSchema.scala   |    2 +
 .../flink/table/codegen/CodeGenerator.scala     |    3 +
 .../apache/flink/table/expressions/call.scala   |    1 +
 .../apache/flink/table/expressions/time.scala   |   44 +-
 .../table/functions/utils/AggSqlFunction.scala  |    1 +
 .../flink/table/plan/rules/FlinkRuleSets.scala  |    4 +-
 .../flink/table/plan/stats/FlinkStatistic.scala |    5 +-
 .../table/plan/util/RexProgramExtractor.scala   |    3 +
 .../table/api/batch/sql/CorrelateTest.scala     |   24 +-
 .../table/api/batch/table/CorrelateTest.scala   |    6 +-
 .../table/api/stream/sql/CorrelateTest.scala    |   24 +-
 .../table/api/stream/table/CorrelateTest.scala  |   24 +-
 .../table/expressions/ScalarFunctionsTest.scala |    5 +-
 .../plan/TimeIndicatorConversionTest.scala      |    2 +-
 24 files changed, 2002 insertions(+), 7963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 8a7e3ac..0e943ad 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -52,7 +52,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.calcite</groupId>
 			<artifactId>calcite-core</artifactId>
-			<version>1.12.0</version>
+			<version>1.13.0</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.calcite.avatica</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
new file mode 100644
index 0000000..d1a87a7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import java.text.DateFormat;
+import java.text.NumberFormat;
+import java.text.ParsePosition;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL CALCITE-1884 IS FIXED.
+ */
+
+/**
+ * Utility functions for datetime types: date, time, timestamp.
+ *
+ * <p>Used by the JDBC driver.
+ *
+ * <p>TODO: review methods for performance. Due to allocations required, it may
+ * be preferable to introduce a "formatter" with the required state.
+ */
+public class DateTimeUtils {
+	/** The julian date of the epoch, 1970-01-01. */
+	public static final int EPOCH_JULIAN = 2440588;
+
+	private DateTimeUtils() {}
+
+	//~ Static fields/initializers ---------------------------------------------
+
+	/** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+	public static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+	/** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+	public static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+	/** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
+	public static final String TIMESTAMP_FORMAT_STRING =
+		DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
+	/** The GMT time zone.
+	 *
+	 * @deprecated Use {@link #UTC_ZONE} */
+	@Deprecated // to be removed before 2.0
+	public static final TimeZone GMT_ZONE = TimeZone.getTimeZone("GMT");
+
+	/** The UTC time zone. */
+	public static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");
+
+	/** The Java default time zone. */
+	public static final TimeZone DEFAULT_ZONE = TimeZone.getDefault();
+
+	/**
+	 * The number of milliseconds in a second.
+	 */
+	public static final long MILLIS_PER_SECOND = 1000L;
+
+	/**
+	 * The number of milliseconds in a minute.
+	 */
+	public static final long MILLIS_PER_MINUTE = 60000L;
+
+	/**
+	 * The number of milliseconds in an hour.
+	 */
+	public static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
+
+	/**
+	 * The number of milliseconds in a day.
+	 *
+	 * <p>This is the modulo 'mask' used when converting
+	 * TIMESTAMP values to DATE and TIME values.
+	 */
+	public static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
+
+	/**
+	 * Calendar set to the epoch (1970-01-01 00:00:00 UTC). Useful for
+	 * initializing other values. Calendars are not immutable, so be careful not
+	 * to screw up this object for everyone else.
+	 */
+	public static final Calendar ZERO_CALENDAR;
+
+	static {
+		ZERO_CALENDAR = Calendar.getInstance(DateTimeUtils.UTC_ZONE, Locale.ROOT);
+		ZERO_CALENDAR.setTimeInMillis(0);
+	}
+
+	//~ Methods ----------------------------------------------------------------
+
+	/**
+	 * Parses a string using {@link SimpleDateFormat} and a given pattern. This
+	 * method parses a string at the specified parse position and if successful,
+	 * updates the parse position to the index after the last character used.
+	 * The parsing is strict and requires months to be less than 12, days to be
+	 * less than 31, etc.
+	 *
+	 * @param s       string to be parsed
+	 * @param dateFormat Date format
+	 * @param tz      time zone in which to interpret string. Defaults to the Java
+	 *                default time zone
+	 * @param pp      position to start parsing from
+	 * @return a Calendar initialized with the parsed value, or null if parsing
+	 * failed. If returned, the Calendar is configured to the GMT time zone.
+	 */
+	private static Calendar parseDateFormat(String s, DateFormat dateFormat,
+											TimeZone tz, ParsePosition pp) {
+		if (tz == null) {
+			tz = DEFAULT_ZONE;
+		}
+		Calendar ret = Calendar.getInstance(tz, Locale.ROOT);
+		dateFormat.setCalendar(ret);
+		dateFormat.setLenient(false);
+
+		final Date d = dateFormat.parse(s, pp);
+		if (null == d) {
+			return null;
+		}
+		ret.setTime(d);
+		ret.setTimeZone(UTC_ZONE);
+		return ret;
+	}
+
+	@Deprecated // to be removed before 2.0
+	public static Calendar parseDateFormat(String s, String pattern,
+										   TimeZone tz) {
+		return parseDateFormat(s, new SimpleDateFormat(pattern, Locale.ROOT), tz);
+	}
+
+	/**
+	 * Parses a string using {@link SimpleDateFormat} and a given pattern. The
+	 * entire string must match the pattern specified.
+	 *
+	 * @param s       string to be parsed
+	 * @param dateFormat Date format
+	 * @param tz      time zone in which to interpret string. Defaults to the Java
+	 *                default time zone
+	 * @return a Calendar initialized with the parsed value, or null if parsing
+	 * failed. If returned, the Calendar is configured to the UTC time zone.
+	 */
+	public static Calendar parseDateFormat(String s, DateFormat dateFormat,
+										   TimeZone tz) {
+		ParsePosition pp = new ParsePosition(0);
+		Calendar ret = parseDateFormat(s, dateFormat, tz, pp);
+		if (pp.getIndex() != s.length()) {
+			// Didn't consume entire string - not good
+			return null;
+		}
+		return ret;
+	}
+
+	@Deprecated // to be removed before 2.0
+	public static PrecisionTime parsePrecisionDateTimeLiteral(
+		String s,
+		String pattern,
+		TimeZone tz) {
+		assert pattern != null;
+		return parsePrecisionDateTimeLiteral(s,
+			new SimpleDateFormat(pattern, Locale.ROOT), tz, 3);
+	}
+
+	/**
+	 * Parses a string using {@link SimpleDateFormat} and a given pattern, and
+	 * if present, parses a fractional seconds component. The fractional seconds
+	 * component must begin with a decimal point ('.') followed by numeric
+	 * digits. The precision is rounded to a maximum of 3 digits of fractional
+	 * seconds precision (to obtain milliseconds).
+	 *
+	 * @param s       string to be parsed
+	 * @param dateFormat Date format
+	 * @param tz      time zone in which to interpret string. Defaults to the
+	 *                local time zone
+	 * @return a {@link DateTimeUtils.PrecisionTime PrecisionTime} initialized
+	 * with the parsed value, or null if parsing failed. The PrecisionTime
+	 * contains a GMT Calendar and a precision.
+	 */
+	public static PrecisionTime parsePrecisionDateTimeLiteral(String s,
+															  DateFormat dateFormat, TimeZone tz, int maxPrecision) {
+		final ParsePosition pp = new ParsePosition(0);
+		final Calendar cal = parseDateFormat(s, dateFormat, tz, pp);
+		if (cal == null) {
+			return null; // Invalid date/time format
+		}
+
+		// Note: the Java SimpleDateFormat 'S' treats any number after
+		// the decimal as milliseconds. That means 12:00:00.9 has 9
+		// milliseconds and 12:00:00.9999 has 9999 milliseconds.
+		int p = 0;
+		String secFraction = "";
+		if (pp.getIndex() < s.length()) {
+			// Check to see if rest is decimal portion
+			if (s.charAt(pp.getIndex()) != '.') {
+				return null;
+			}
+
+			// Skip decimal sign
+			pp.setIndex(pp.getIndex() + 1);
+
+			// Parse decimal portion
+			if (pp.getIndex() < s.length()) {
+				secFraction = s.substring(pp.getIndex());
+				if (!secFraction.matches("\\d+")) {
+					return null;
+				}
+				NumberFormat nf = NumberFormat.getIntegerInstance(Locale.ROOT);
+				Number num = nf.parse(s, pp);
+				if ((num == null) || (pp.getIndex() != s.length())) {
+					// Invalid decimal portion
+					return null;
+				}
+
+				// Determine precision - only support prec 3 or lower
+				// (milliseconds) Higher precisions are quietly rounded away
+				p = secFraction.length();
+				if (maxPrecision >= 0) {
+					// If there is a maximum precision, ignore subsequent digits
+					p = Math.min(maxPrecision, p);
+					secFraction = secFraction.substring(0, p);
+				}
+
+				// Calculate milliseconds
+				String millis = secFraction;
+				if (millis.length() > 3) {
+					millis = secFraction.substring(0, 3);
+				}
+				while (millis.length() < 3) {
+					millis = millis + "0";
+				}
+
+				int ms = Integer.valueOf(millis);
+				cal.add(Calendar.MILLISECOND, ms);
+			}
+		}
+
+		assert pp.getIndex() == s.length();
+		return new PrecisionTime(cal, secFraction, p);
+	}
+
+	/**
+	 * Gets the active time zone based on a Calendar argument
+	 */
+	public static TimeZone getTimeZone(Calendar cal) {
+		if (cal == null) {
+			return DEFAULT_ZONE;
+		}
+		return cal.getTimeZone();
+	}
+
+	/**
+	 * Checks if the date/time format is valid
+	 *
+	 * @param pattern {@link SimpleDateFormat}  pattern
+	 * @throws IllegalArgumentException if the given pattern is invalid
+	 */
+	public static void checkDateFormat(String pattern) {
+		new SimpleDateFormat(pattern, Locale.ROOT);
+	}
+
+	/**
+	 * Creates a new date formatter with Farrago specific options. Farrago
+	 * parsing is strict and does not allow values such as day 0, month 13, etc.
+	 *
+	 * @param format {@link SimpleDateFormat}  pattern
+	 */
+	public static SimpleDateFormat newDateFormat(String format) {
+		SimpleDateFormat sdf = new SimpleDateFormat(format, Locale.ROOT);
+		sdf.setLenient(false);
+		return sdf;
+	}
+
+	/** Helper for CAST({timestamp} AS VARCHAR(n)). */
+	public static String unixTimestampToString(long timestamp) {
+		return unixTimestampToString(timestamp, 0);
+	}
+
+	public static String unixTimestampToString(long timestamp, int precision) {
+		final StringBuilder buf = new StringBuilder(17);
+		int date = (int) (timestamp / MILLIS_PER_DAY);
+		int time = (int) (timestamp % MILLIS_PER_DAY);
+		if (time < 0) {
+			--date;
+			time += MILLIS_PER_DAY;
+		}
+		unixDateToString(buf, date);
+		buf.append(' ');
+		unixTimeToString(buf, time, precision);
+		return buf.toString();
+	}
+
+	/** Helper for CAST({timestamp} AS VARCHAR(n)). */
+	public static String unixTimeToString(int time) {
+		return unixTimeToString(time, 0);
+	}
+
+	public static String unixTimeToString(int time, int precision) {
+		final StringBuilder buf = new StringBuilder(8);
+		unixTimeToString(buf, time, precision);
+		return buf.toString();
+	}
+
+	private static void unixTimeToString(StringBuilder buf, int time,
+										 int precision) {
+		int h = time / 3600000;
+		int time2 = time % 3600000;
+		int m = time2 / 60000;
+		int time3 = time2 % 60000;
+		int s = time3 / 1000;
+		int ms = time3 % 1000;
+		int2(buf, h);
+		buf.append(':');
+		int2(buf, m);
+		buf.append(':');
+		int2(buf, s);
+		if (precision > 0) {
+			buf.append('.');
+			while (precision > 0) {
+				buf.append((char) ('0' + (ms / 100)));
+				ms = ms % 100;
+				ms = ms * 10;
+				--precision;
+			}
+		}
+	}
+
+	private static void int2(StringBuilder buf, int i) {
+		buf.append((char) ('0' + (i / 10) % 10));
+		buf.append((char) ('0' + i % 10));
+	}
+
+	private static void int4(StringBuilder buf, int i) {
+		buf.append((char) ('0' + (i / 1000) % 10));
+		buf.append((char) ('0' + (i / 100) % 10));
+		buf.append((char) ('0' + (i / 10) % 10));
+		buf.append((char) ('0' + i % 10));
+	}
+
+	/** Helper for CAST({date} AS VARCHAR(n)). */
+	public static String unixDateToString(int date) {
+		final StringBuilder buf = new StringBuilder(10);
+		unixDateToString(buf, date);
+		return buf.toString();
+	}
+
+	private static void unixDateToString(StringBuilder buf, int date) {
+		julianToString(buf, date + EPOCH_JULIAN);
+	}
+
+	private static void julianToString(StringBuilder buf, int julian) {
+		// Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998
+		int b, c;
+		if (julian > 2299160) {
+			int a = julian + 32044;
+			b = (4 * a + 3) / 146097;
+			c = a - b *146097 / 4;
+		} else {
+			b = 0;
+			c = julian + 32082;
+		}
+		int d = (4 * c + 3) / 1461;
+		int e = c - (1461 * d) / 4;
+		int m = (5 * e + 2) / 153;
+		int day = e - (153 * m + 2) / 5 + 1;
+		int month = m + 3 - 12 * (m / 10);
+		int year = b * 100 + d - 4800 + (m / 10);
+
+		int4(buf, year);
+		buf.append('-');
+		int2(buf, month);
+		buf.append('-');
+		int2(buf, day);
+	}
+
+	public static String intervalYearMonthToString(int v, TimeUnitRange range) {
+		final StringBuilder buf = new StringBuilder();
+		if (v >= 0) {
+			buf.append('+');
+		} else {
+			buf.append('-');
+			v = -v;
+		}
+		final int y;
+		final int m;
+		switch (range) {
+			case YEAR:
+				v = roundUp(v, 12);
+				y = v / 12;
+				buf.append(y);
+				break;
+			case YEAR_TO_MONTH:
+				y = v / 12;
+				buf.append(y);
+				buf.append('-');
+				m = v % 12;
+				number(buf, m, 2);
+				break;
+			case MONTH:
+				m = v;
+				buf.append(m);
+				break;
+			default:
+				throw new AssertionError(range);
+		}
+		return buf.toString();
+	}
+
+	public static StringBuilder number(StringBuilder buf, int v, int n) {
+		for (int k = digitCount(v); k < n; k++) {
+			buf.append('0');
+		}
+		return buf.append(v);
+	}
+
+	public static int digitCount(int v) {
+		for (int n = 1;; n++) {
+			v /= 10;
+			if (v == 0) {
+				return n;
+			}
+		}
+	}
+
+	private static int roundUp(int dividend, int divisor) {
+		int remainder = dividend % divisor;
+		dividend -= remainder;
+		if (remainder * 2 > divisor) {
+			dividend += divisor;
+		}
+		return dividend;
+	}
+
+	/** Cheap, unsafe, long power. power(2, 3) returns 8. */
+	public static long powerX(long a, long b) {
+		long x = 1;
+		while (b > 0) {
+			x *= a;
+			--b;
+		}
+		return x;
+	}
+
+	public static String intervalDayTimeToString(long v, TimeUnitRange range,
+												 int scale) {
+		final StringBuilder buf = new StringBuilder();
+		if (v >= 0) {
+			buf.append('+');
+		} else {
+			buf.append('-');
+			v = -v;
+		}
+		final long ms;
+		final long s;
+		final long m;
+		final long h;
+		final long d;
+		switch (range) {
+			case DAY_TO_SECOND:
+				v = roundUp(v, powerX(10, 3 - scale));
+				ms = v % 1000;
+				v /= 1000;
+				s = v % 60;
+				v /= 60;
+				m = v % 60;
+				v /= 60;
+				h = v % 24;
+				v /= 24;
+				d = v;
+				buf.append((int) d);
+				buf.append(' ');
+				number(buf, (int) h, 2);
+				buf.append(':');
+				number(buf, (int) m, 2);
+				buf.append(':');
+				number(buf, (int) s, 2);
+				fraction(buf, scale, ms);
+				break;
+			case DAY_TO_MINUTE:
+				v = roundUp(v, 1000 * 60);
+				v /= 1000;
+				v /= 60;
+				m = v % 60;
+				v /= 60;
+				h = v % 24;
+				v /= 24;
+				d = v;
+				buf.append((int) d);
+				buf.append(' ');
+				number(buf, (int) h, 2);
+				buf.append(':');
+				number(buf, (int) m, 2);
+				break;
+			case DAY_TO_HOUR:
+				v = roundUp(v, 1000 * 60 * 60);
+				v /= 1000;
+				v /= 60;
+				v /= 60;
+				h = v % 24;
+				v /= 24;
+				d = v;
+				buf.append((int) d);
+				buf.append(' ');
+				number(buf, (int) h, 2);
+				break;
+			case DAY:
+				v = roundUp(v, 1000 * 60 * 60 * 24);
+				d = v / (1000 * 60 * 60 * 24);
+				buf.append((int) d);
+				break;
+			case HOUR:
+				v = roundUp(v, 1000 * 60 * 60);
+				v /= 1000;
+				v /= 60;
+				v /= 60;
+				h = v;
+				buf.append((int) h);
+				break;
+			case HOUR_TO_MINUTE:
+				v = roundUp(v, 1000 * 60);
+				v /= 1000;
+				v /= 60;
+				m = v % 60;
+				v /= 60;
+				h = v;
+				buf.append((int) h);
+				buf.append(':');
+				number(buf, (int) m, 2);
+				break;
+			case HOUR_TO_SECOND:
+				v = roundUp(v, powerX(10, 3 - scale));
+				ms = v % 1000;
+				v /= 1000;
+				s = v % 60;
+				v /= 60;
+				m = v % 60;
+				v /= 60;
+				h = v;
+				buf.append((int) h);
+				buf.append(':');
+				number(buf, (int) m, 2);
+				buf.append(':');
+				number(buf, (int) s, 2);
+				fraction(buf, scale, ms);
+				break;
+			case MINUTE_TO_SECOND:
+				v = roundUp(v, powerX(10, 3 - scale));
+				ms = v % 1000;
+				v /= 1000;
+				s = v % 60;
+				v /= 60;
+				m = v;
+				buf.append((int) m);
+				buf.append(':');
+				number(buf, (int) s, 2);
+				fraction(buf, scale, ms);
+				break;
+			case MINUTE:
+				v = roundUp(v, 1000 * 60);
+				v /= 1000;
+				v /= 60;
+				m = v;
+				buf.append((int) m);
+				break;
+			case SECOND:
+				v = roundUp(v, powerX(10, 3 - scale));
+				ms = v % 1000;
+				v /= 1000;
+				s = v;
+				buf.append((int) s);
+				fraction(buf, scale, ms);
+				break;
+			default:
+				throw new AssertionError(range);
+		}
+		return buf.toString();
+	}
+
+	/**
+	 * Rounds a dividend to the nearest divisor.
+	 * For example roundUp(31, 10) yields 30; roundUp(37, 10) yields 40.
+	 * @param dividend Number to be divided
+	 * @param divisor Number to divide by
+	 * @return Rounded dividend
+	 */
+	private static long roundUp(long dividend, long divisor) {
+		long remainder = dividend % divisor;
+		dividend -= remainder;
+		if (remainder * 2 > divisor) {
+			dividend += divisor;
+		}
+		return dividend;
+	}
+
+	private static void fraction(StringBuilder buf, int scale, long ms) {
+		if (scale > 0) {
+			buf.append('.');
+			long v1 = scale == 3 ? ms
+				: scale == 2 ? ms / 10
+				: scale == 1 ? ms / 100
+				: 0;
+			number(buf, (int) v1, scale);
+		}
+	}
+
+	public static int dateStringToUnixDate(String s) {
+		int hyphen1 = s.indexOf('-');
+		int y;
+		int m;
+		int d;
+		if (hyphen1 < 0) {
+			y = Integer.parseInt(s.trim());
+			m = 1;
+			d = 1;
+		} else {
+			y = Integer.parseInt(s.substring(0, hyphen1).trim());
+			final int hyphen2 = s.indexOf('-', hyphen1 + 1);
+			if (hyphen2 < 0) {
+				m = Integer.parseInt(s.substring(hyphen1 + 1).trim());
+				d = 1;
+			} else {
+				m = Integer.parseInt(s.substring(hyphen1 + 1, hyphen2).trim());
+				d = Integer.parseInt(s.substring(hyphen2 + 1).trim());
+			}
+		}
+		return ymdToUnixDate(y, m, d);
+	}
+
+	public static int timeStringToUnixDate(String v) {
+		return timeStringToUnixDate(v, 0);
+	}
+
+	public static int timeStringToUnixDate(String v, int start) {
+		final int colon1 = v.indexOf(':', start);
+		int hour;
+		int minute;
+		int second;
+		int milli;
+		if (colon1 < 0) {
+			hour = Integer.parseInt(v.trim());
+			minute = 1;
+			second = 1;
+			milli = 0;
+		} else {
+			hour = Integer.parseInt(v.substring(start, colon1).trim());
+			final int colon2 = v.indexOf(':', colon1 + 1);
+			if (colon2 < 0) {
+				minute = Integer.parseInt(v.substring(colon1 + 1).trim());
+				second = 1;
+				milli = 0;
+			} else {
+				minute = Integer.parseInt(v.substring(colon1 + 1, colon2).trim());
+				int dot = v.indexOf('.', colon2);
+				if (dot < 0) {
+					second = Integer.parseInt(v.substring(colon2 + 1).trim());
+					milli = 0;
+				} else {
+					second = Integer.parseInt(v.substring(colon2 + 1, dot).trim());
+					milli = parseFraction(v.substring(dot + 1).trim(), 100);
+				}
+			}
+		}
+		return hour * (int) MILLIS_PER_HOUR
+			+ minute * (int) MILLIS_PER_MINUTE
+			+ second * (int) MILLIS_PER_SECOND
+			+ milli;
+	}
+
+	/** Parses a fraction, multiplying the first character by {@code multiplier},
+	 * the second character by {@code multiplier / 10},
+	 * the third character by {@code multiplier / 100}, and so forth.
+	 *
+	 * <p>For example, {@code parseFraction("1234", 100)} yields {@code 123}. */
+	private static int parseFraction(String v, int multiplier) {
+		int r = 0;
+		for (int i = 0; i < v.length(); i++) {
+			char c = v.charAt(i);
+			int x = c < '0' || c > '9' ? 0 : (c - '0');
+			r += multiplier * x;
+			if (multiplier < 10) {
+				// We're at the last digit. Check for rounding.
+				if (i + 1 < v.length()
+					&& v.charAt(i + 1) >= '5') {
+					++r;
+				}
+				break;
+			}
+			multiplier /= 10;
+		}
+		return r;
+	}
+
+	public static long timestampStringToUnixDate(String s) {
+		final long d;
+		final long t;
+		s = s.trim();
+		int space = s.indexOf(' ');
+		if (space >= 0) {
+			d = dateStringToUnixDate(s.substring(0, space));
+			t = timeStringToUnixDate(s, space + 1);
+		} else {
+			d = dateStringToUnixDate(s);
+			t = 0;
+		}
+		return d * MILLIS_PER_DAY + t;
+	}
+
+	public static long unixDateExtract(TimeUnitRange range, long date) {
+		return julianExtract(range, (int) date + EPOCH_JULIAN);
+	}
+
+	private static int julianExtract(TimeUnitRange range, int julian) {
+		// Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998
+		int b, c;
+		if (julian > 2299160) {
+			int a = julian + 32044;
+			b = (4 * a + 3) / 146097;
+			c = a - b *146097 / 4;
+		} else {
+			b = 0;
+			c = julian + 32082;
+		}
+		int d = (4 * c + 3) / 1461;
+		int e = c - (1461 * d) / 4;
+		int m = (5 * e + 2) / 153;
+		int day = e - (153 * m + 2) / 5 + 1;
+		int month = m + 3 - 12 * (m / 10);
+		int year = b * 100 + d - 4800 + (m / 10);
+
+		switch (range) {
+			case YEAR:
+				return year;
+			case QUARTER:
+				return (month + 2) / 3;
+			case MONTH:
+				return month;
+			case DAY:
+				return day;
+			case DOW:
+				return (int) floorMod(julian + 1, 7) + 1; // sun=1, sat=7
+			case WEEK:
+				long fmofw = firstMondayOfFirstWeek(year);
+				if (julian < fmofw) {
+					fmofw = firstMondayOfFirstWeek(year - 1);
+				}
+				return (int) (julian - fmofw) / 7 + 1;
+			case DOY:
+				final long janFirst = ymdToJulian(year, 1, 1);
+				return (int) (julian - janFirst) + 1;
+			case CENTURY:
+				return year > 0
+					? (year + 99) / 100
+					: (year - 99) / 100;
+			case MILLENNIUM:
+				return year > 0
+					? (year + 999) / 1000
+					: (year - 999) / 1000;
+			default:
+				throw new AssertionError(range);
+		}
+	}
+
+	/** Returns the first day of the first week of a year.
+	 * Per ISO-8601 it is the Monday of the week that contains Jan 4,
+	 * or equivalently, it is a Monday between Dec 29 and Jan 4.
+	 * Sometimes it is in the year before the given year. */
+	private static long firstMondayOfFirstWeek(int year) {
+		final long janFirst = ymdToJulian(year, 1, 1);
+		final long janFirstDow = floorMod(janFirst + 1, 7); // sun=0, sat=6
+		return janFirst + (11 - janFirstDow) % 7 - 3;
+	}
+
+	/** Extracts a time unit from a UNIX date (milliseconds since epoch). */
+	public static int unixTimestampExtract(TimeUnitRange range,
+										   long timestamp) {
+		return unixTimeExtract(range, (int) floorMod(timestamp, MILLIS_PER_DAY));
+	}
+
+	/** Extracts a time unit from a time value (milliseconds since midnight). */
+	public static int unixTimeExtract(TimeUnitRange range, int time) {
+		assert time >= 0;
+		assert time < MILLIS_PER_DAY;
+		switch (range) {
+			case HOUR:
+				return time / (int) MILLIS_PER_HOUR;
+			case MINUTE:
+				final int minutes = time / (int) MILLIS_PER_MINUTE;
+				return minutes % 60;
+			case SECOND:
+				final int seconds = time / (int) MILLIS_PER_SECOND;
+				return seconds % 60;
+			default:
+				throw new AssertionError(range);
+		}
+	}
+
+	/** Resets to zero the "time" part of a timestamp. */
+	public static long resetTime(long timestamp) {
+		int date = (int) (timestamp / MILLIS_PER_DAY);
+		return (long) date * MILLIS_PER_DAY;
+	}
+
+	/** Resets to epoch (1970-01-01) the "date" part of a timestamp. */
+	public static long resetDate(long timestamp) {
+		return floorMod(timestamp, MILLIS_PER_DAY);
+	}
+
+	public static long unixTimestampFloor(TimeUnitRange range, long timestamp) {
+		int date = (int) (timestamp / MILLIS_PER_DAY);
+		final int f = julianDateFloor(range, date + EPOCH_JULIAN, true);
+		return (long) f * MILLIS_PER_DAY;
+	}
+
+	public static long unixDateFloor(TimeUnitRange range, long date) {
+		return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+	}
+
+	public static long unixTimestampCeil(TimeUnitRange range, long timestamp) {
+		int date = (int) (timestamp / MILLIS_PER_DAY);
+		final int f = julianDateFloor(range, date + EPOCH_JULIAN, false);
+		return (long) f * MILLIS_PER_DAY;
+	}
+
+	public static long unixDateCeil(TimeUnitRange range, long date) {
+		return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+	}
+
+	private static int julianDateFloor(TimeUnitRange range, int julian,
+									   boolean floor) {
+		// Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998
+		int b, c;
+		if (julian > 2299160) {
+			int a = julian + 32044;
+			b = (4 * a + 3) / 146097;
+			c = a - b *146097 / 4;
+		} else {
+			b = 0;
+			c = julian + 32082;
+		}
+		int d = (4 * c + 3) / 1461;
+		int e = c - (1461 * d) / 4;
+		int m = (5 * e + 2) / 153;
+		int day = e - (153 * m + 2) / 5 + 1;
+		int month = m + 3 - 12 * (m / 10);
+		int year = b * 100 + d - 4800 + (m / 10);
+
+		switch (range) {
+			case YEAR:
+				if (!floor && (month > 1 || day > 1)) {
+					++year;
+				}
+				return ymdToUnixDate(year, 1, 1);
+			case MONTH:
+				if (!floor && day > 1) {
+					++month;
+				}
+				return ymdToUnixDate(year, month, 1);
+			default:
+				throw new AssertionError(range);
+		}
+	}
+
+	public static int ymdToUnixDate(int year, int month, int day) {
+		final int julian = ymdToJulian(year, month, day);
+		return julian - EPOCH_JULIAN;
+	}
+
+	public static int ymdToJulian(int year, int month, int day) {
+		int a = (14 - month) / 12;
+		int y = year + 4800 - a;
+		int m = month + 12 * a - 3;
+		int j = day + (153 * m + 2) / 5
+			+ 365 * y
+			+ y / 4
+			- y / 100
+			+ y / 400
+			- 32045;
+		if (j < 2299161) {
+			j = day + (153 * m + 2) / 5 + 365 * y + y / 4 - 32083;
+		}
+		return j;
+	}
+
+	public static long unixTimestamp(int year, int month, int day, int hour,
+									 int minute, int second) {
+		final int date = ymdToUnixDate(year, month, day);
+		return (long) date * MILLIS_PER_DAY
+			+ (long) hour * MILLIS_PER_HOUR
+			+ (long) minute * MILLIS_PER_MINUTE
+			+ (long) second * MILLIS_PER_SECOND;
+	}
+
+	/** Adds a given number of months to a timestamp, represented as the number
+	 * of milliseconds since the epoch. */
+	public static long addMonths(long timestamp, int m) {
+		final long millis =
+			DateTimeUtils.floorMod(timestamp, DateTimeUtils.MILLIS_PER_DAY);
+		timestamp -= millis;
+		final long x =
+			addMonths((int) (timestamp / DateTimeUtils.MILLIS_PER_DAY), m);
+		return x * DateTimeUtils.MILLIS_PER_DAY + millis;
+	}
+
+	/** Adds a given number of months to a date, represented as the number of
+	 * days since the epoch. */
+	public static int addMonths(int date, int m) {
+		int y0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.YEAR, date);
+		int m0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.MONTH, date);
+		int d0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.DAY, date);
+		int y = m / 12;
+		y0 += y;
+		m0 += m - y * 12;
+		int last = lastDay(y0, m0);
+		if (d0 > last) {
+			d0 = 1;
+			if (++m0 > 12) {
+				m0 = 1;
+				++y0;
+			}
+		}
+		return DateTimeUtils.ymdToUnixDate(y0, m0, d0);
+	}
+
+	private static int lastDay(int y, int m) {
+		switch (m) {
+			case 2:
+				return y % 4 == 0
+					&& (y % 100 != 0
+					|| y % 400 == 0)
+					? 29 : 28;
+			case 4:
+			case 6:
+			case 9:
+			case 11:
+				return 30;
+			default:
+				return 31;
+		}
+	}
+
+	/** Finds the number of months between two dates, each represented as the
+	 * number of days since the epoch. */
+	public static int subtractMonths(int date0, int date1) {
+		if (date0 < date1) {
+			return -subtractMonths(date1, date0);
+		}
+		// Start with an estimate.
+		// Since no month has more than 31 days, the estimate is <= the true value.
+		int m = (date0 - date1) / 31;
+		for (;;) {
+			int date2 = addMonths(date1, m);
+			if (date2 >= date0) {
+				return m;
+			}
+			int date3 = addMonths(date1, m + 1);
+			if (date3 > date0) {
+				return m;
+			}
+			++m;
+		}
+	}
+
+	public static int subtractMonths(long t0, long t1) {
+		final long millis0 =
+			DateTimeUtils.floorMod(t0, DateTimeUtils.MILLIS_PER_DAY);
+		final int d0 = (int) DateTimeUtils.floorDiv(t0 - millis0,
+			DateTimeUtils.MILLIS_PER_DAY);
+		final long millis1 =
+			DateTimeUtils.floorMod(t1, DateTimeUtils.MILLIS_PER_DAY);
+		final int d1 = (int) DateTimeUtils.floorDiv(t1 - millis1,
+			DateTimeUtils.MILLIS_PER_DAY);
+		int x = subtractMonths(d0, d1);
+		final long d2 = addMonths(d1, x);
+		if (d2 == d0 && millis0 < millis1) {
+			--x;
+		}
+		return x;
+	}
+
+	/** Divide, rounding towards negative infinity. */
+	public static long floorDiv(long x, long y) {
+		long r = x / y;
+		// if the signs are different and modulo not zero, round down
+		if ((x ^ y) < 0 && (r * y != x)) {
+			r--;
+		}
+		return r;
+	}
+
+	/** Modulo, always returning a non-negative result. */
+	public static long floorMod(long x, long y) {
+		return x - floorDiv(x, y) * y;
+	}
+
+	/** Creates an instance of {@link Calendar} in the root locale and UTC time
+	 * zone. */
+	public static Calendar calendar() {
+		return Calendar.getInstance(UTC_ZONE, Locale.ROOT);
+	}
+
+	//~ Inner Classes ----------------------------------------------------------
+
+	/**
+	 * Helper class for {@link DateTimeUtils#parsePrecisionDateTimeLiteral}
+	 */
+	public static class PrecisionTime {
+		private final Calendar cal;
+		private final String fraction;
+		private final int precision;
+
+		public PrecisionTime(Calendar cal, String fraction, int precision) {
+			this.cal = cal;
+			this.fraction = fraction;
+			this.precision = precision;
+		}
+
+		public Calendar getCalendar() {
+			return cal;
+		}
+
+		public int getPrecision() {
+			return precision;
+		}
+
+		public String getFraction() {
+			return fraction;
+		}
+	}
+}
+
+// End DateTimeUtils.java

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
new file mode 100644
index 0000000..0955aeb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.Strong;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.runtime.PredicateImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Set;
+
+// This class is copied from Apache Calcite except that it does not
+// automatically name the field using the name of the operators
+// as the Table API rejects special characters like '-' in the field names.
+
+/**
+ * PushProjector is a utility class used to perform operations used in push
+ * projection rules.
+ *
+ * <p>Pushing is particularly interesting in the case of join, because there
+ * are multiple inputs. Generally an expression can be pushed down to a
+ * particular input if it depends upon no other inputs. If it can be pushed
+ * down to both sides, it is pushed down to the left.
+ *
+ * <p>Sometimes an expression needs to be split before it can be pushed down.
+ * To flag that an expression cannot be split, specify a rule that it must be
+ * <dfn>preserved</dfn>. Such an expression will be pushed down intact to one
+ * of the inputs, or not pushed down at all.</p>
+ */
+public class PushProjector {
+  //~ Instance fields --------------------------------------------------------
+
+  private final Project origProj;
+  private final RexNode origFilter;
+  private final RelNode childRel;
+  private final ExprCondition preserveExprCondition;
+  private final RelBuilder relBuilder;
+
+  /**
+   * Original projection expressions
+   */
+  final List<RexNode> origProjExprs;
+
+  /**
+   * Fields from the RelNode that the projection is being pushed past
+   */
+  final List<RelDataTypeField> childFields;
+
+  /**
+   * Number of fields in the RelNode that the projection is being pushed past
+   */
+  final int nChildFields;
+
+  /**
+   * Bitmap containing the references in the original projection
+   */
+  final BitSet projRefs;
+
+  /**
+   * Bitmap containing the fields in the RelNode that the projection is being
+   * pushed past, if the RelNode is not a join. If the RelNode is a join, then
+   * the fields correspond to the left hand side of the join.
+   */
+  final ImmutableBitSet childBitmap;
+
+  /**
+   * Bitmap containing the fields in the right hand side of a join, in the
+   * case where the projection is being pushed past a join. Not used
+   * otherwise.
+   */
+  final ImmutableBitSet rightBitmap;
+
+  /**
+   * Bitmap containing the fields that should be strong, i.e. when preserving expressions
+   * we can only preserve them if the expressions if it is null when these fields are null.
+   */
+  final ImmutableBitSet strongBitmap;
+
+  /**
+   * Number of fields in the RelNode that the projection is being pushed past,
+   * if the RelNode is not a join. If the RelNode is a join, then this is the
+   * number of fields in the left hand side of the join.
+   *
+   * <p>The identity
+   * {@code nChildFields == nSysFields + nFields + nFieldsRight}
+   * holds. {@code nFields} does not include {@code nSysFields}.
+   * The output of a join looks like this:
+   *
+   * <blockquote><pre>
+   * | nSysFields | nFields | nFieldsRight |
+   * </pre></blockquote>
+   *
+   * <p>The output of a single-input rel looks like this:
+   *
+   * <blockquote><pre>
+   * | nSysFields | nFields |
+   * </pre></blockquote>
+   */
+  final int nFields;
+
+  /**
+   * Number of fields in the right hand side of a join, in the case where the
+   * projection is being pushed past a join. Always 0 otherwise.
+   */
+  final int nFieldsRight;
+
+  /**
+   * Number of system fields. System fields appear at the start of a join,
+   * before the first field from the left input.
+   */
+  private final int nSysFields;
+
+  /**
+   * Expressions referenced in the projection/filter that should be preserved.
+   * In the case where the projection is being pushed past a join, then the
+   * list only contains the expressions corresponding to the left hand side of
+   * the join.
+   */
+  final List<RexNode> childPreserveExprs;
+
+  /**
+   * Expressions referenced in the projection/filter that should be preserved,
+   * corresponding to expressions on the right hand side of the join, if the
+   * projection is being pushed past a join. Empty list otherwise.
+   */
+  final List<RexNode> rightPreserveExprs;
+
+  /**
+   * Number of system fields being projected.
+   */
+  int nSystemProject;
+
+  /**
+   * Number of fields being projected. In the case where the projection is
+   * being pushed past a join, the number of fields being projected from the
+   * left hand side of the join.
+   */
+  int nProject;
+
+  /**
+   * Number of fields being projected from the right hand side of a join, in
+   * the case where the projection is being pushed past a join. 0 otherwise.
+   */
+  int nRightProject;
+
+  /**
+   * Rex builder used to create new expressions.
+   */
+  final RexBuilder rexBuilder;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a PushProjector object for pushing projects past a RelNode.
+   *
+   * @param origProj              the original projection that is being pushed;
+   *                              may be null if the projection is implied as a
+   *                              result of a projection having been trivially
+   *                              removed
+   * @param origFilter            the filter that the projection must also be
+   *                              pushed past, if applicable
+   * @param childRel              the RelNode that the projection is being
+   *                              pushed past
+   * @param preserveExprCondition condition for whether an expression should
+   *                              be preserved in the projection
+   */
+  public PushProjector(
+      Project origProj,
+      RexNode origFilter,
+      RelNode childRel,
+      ExprCondition preserveExprCondition,
+      RelBuilder relBuilder) {
+    this.origProj = origProj;
+    this.origFilter = origFilter;
+    this.childRel = childRel;
+    this.preserveExprCondition = preserveExprCondition;
+    this.relBuilder = Preconditions.checkNotNull(relBuilder);
+    if (origProj == null) {
+      origProjExprs = ImmutableList.of();
+    } else {
+      origProjExprs = origProj.getProjects();
+    }
+
+    childFields = childRel.getRowType().getFieldList();
+    nChildFields = childFields.size();
+
+    projRefs = new BitSet(nChildFields);
+    if (childRel instanceof Join) {
+      Join joinRel = (Join) childRel;
+      List<RelDataTypeField> leftFields =
+          joinRel.getLeft().getRowType().getFieldList();
+      List<RelDataTypeField> rightFields =
+          joinRel.getRight().getRowType().getFieldList();
+      nFields = leftFields.size();
+      nFieldsRight = childRel instanceof SemiJoin ? 0 : rightFields.size();
+      nSysFields = joinRel.getSystemFieldList().size();
+      childBitmap =
+          ImmutableBitSet.range(nSysFields, nFields + nSysFields);
+      rightBitmap =
+          ImmutableBitSet.range(nFields + nSysFields, nChildFields);
+
+      switch (joinRel.getJoinType()) {
+      case INNER:
+        strongBitmap = ImmutableBitSet.of();
+        break;
+      case RIGHT:  // All the left-input's columns must be strong
+        strongBitmap = ImmutableBitSet.range(nSysFields, nFields + nSysFields);
+        break;
+      case LEFT: // All the right-input's columns must be strong
+        strongBitmap = ImmutableBitSet.range(nFields + nSysFields, nChildFields);
+        break;
+      case FULL:
+      default:
+        strongBitmap = ImmutableBitSet.range(nSysFields, nChildFields);
+      }
+
+    } else {
+      nFields = nChildFields;
+      nFieldsRight = 0;
+      childBitmap = ImmutableBitSet.range(nChildFields);
+      rightBitmap = null;
+      nSysFields = 0;
+      strongBitmap = ImmutableBitSet.of();
+    }
+    assert nChildFields == nSysFields + nFields + nFieldsRight;
+
+    childPreserveExprs = new ArrayList<RexNode>();
+    rightPreserveExprs = new ArrayList<RexNode>();
+
+    rexBuilder = childRel.getCluster().getRexBuilder();
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  /**
+   * Decomposes a projection to the input references referenced by a
+   * projection and a filter, either of which is optional. If both are
+   * provided, the filter is underneath the project.
+   *
+   * <p>Creates a projection containing all input references as well as
+   * preserving any special expressions. Converts the original projection
+   * and/or filter to reference the new projection. Then, finally puts on top,
+   * a final projection corresponding to the original projection.
+   *
+   * @param defaultExpr expression to be used in the projection if no fields
+   *                    or special columns are selected
+   * @return the converted projection if it makes sense to push elements of
+   * the projection; otherwise returns null
+   */
+  public RelNode convertProject(RexNode defaultExpr) {
+    // locate all fields referenced in the projection and filter
+    locateAllRefs();
+
+    // if all columns are being selected (either explicitly in the
+    // projection) or via a "select *", then there needs to be some
+    // special expressions to preserve in the projection; otherwise,
+    // there's no point in proceeding any further
+    if (origProj == null) {
+      if (childPreserveExprs.size() == 0) {
+        return null;
+      }
+
+      // even though there is no projection, this is the same as
+      // selecting all fields
+      if (nChildFields > 0) {
+        // Calling with nChildFields == 0 should be safe but hits
+        // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6222207
+        projRefs.set(0, nChildFields);
+      }
+      nProject = nChildFields;
+    } else if (
+        (projRefs.cardinality() == nChildFields)
+            && (childPreserveExprs.size() == 0)) {
+      return null;
+    }
+
+    // if nothing is being selected from the underlying rel, just
+    // project the default expression passed in as a parameter or the
+    // first column if there is no default expression
+    if ((projRefs.cardinality() == 0) && (childPreserveExprs.size() == 0)) {
+      if (defaultExpr != null) {
+        childPreserveExprs.add(defaultExpr);
+      } else if (nChildFields == 1) {
+        return null;
+      } else {
+        projRefs.set(0);
+        nProject = 1;
+      }
+    }
+
+    // create a new projection referencing all fields referenced in
+    // either the project or the filter
+    RelNode newProject = createProjectRefsAndExprs(childRel, false, false);
+
+    int[] adjustments = getAdjustments();
+
+    // if a filter was passed in, convert it to reference the projected
+    // columns, placing it on top of the project just created
+    RelNode projChild;
+    if (origFilter != null) {
+      RexNode newFilter =
+          convertRefsAndExprs(
+              origFilter,
+              newProject.getRowType().getFieldList(),
+              adjustments);
+      relBuilder.push(newProject);
+      relBuilder.filter(newFilter);
+      projChild = relBuilder.build();
+    } else {
+      projChild = newProject;
+    }
+
+    // put the original project on top of the filter/project, converting
+    // it to reference the modified projection list; otherwise, create
+    // a projection that essentially selects all fields
+    return createNewProject(projChild, adjustments);
+  }
+
+  /**
+   * Locates all references found in either the projection expressions a
+   * filter, as well as references to expressions that should be preserved.
+   * Based on that, determines whether pushing the projection makes sense.
+   *
+   * @return true if all inputs from the child that the projection is being
+   * pushed past are referenced in the projection/filter and no special
+   * preserve expressions are referenced; in that case, it does not make sense
+   * to push the projection
+   */
+  public boolean locateAllRefs() {
+    RexUtil.apply(
+        new InputSpecialOpFinder(
+            projRefs,
+            childBitmap,
+            rightBitmap,
+            strongBitmap,
+            preserveExprCondition,
+            childPreserveExprs,
+            rightPreserveExprs),
+        origProjExprs,
+        origFilter);
+
+    // The system fields of each child are always used by the join, even if
+    // they are not projected out of it.
+    projRefs.set(
+        nSysFields,
+        nSysFields + nSysFields,
+        true);
+    projRefs.set(
+        nSysFields + nFields,
+        nSysFields + nFields + nSysFields,
+        true);
+
+    // Count how many fields are projected.
+    nSystemProject = 0;
+    nProject = 0;
+    nRightProject = 0;
+    for (int bit : BitSets.toIter(projRefs)) {
+      if (bit < nSysFields) {
+        nSystemProject++;
+      } else if (bit < nSysFields + nFields) {
+        nProject++;
+      } else {
+        nRightProject++;
+      }
+    }
+
+    assert nSystemProject + nProject + nRightProject
+        == projRefs.cardinality();
+
+    if ((childRel instanceof Join)
+        || (childRel instanceof SetOp)) {
+      // if nothing is projected from the children, arbitrarily project
+      // the first columns; this is necessary since Fennel doesn't
+      // handle 0-column projections
+      if ((nProject == 0) && (childPreserveExprs.size() == 0)) {
+        projRefs.set(0);
+        nProject = 1;
+      }
+      if (childRel instanceof Join) {
+        if ((nRightProject == 0) && (rightPreserveExprs.size() == 0)) {
+          projRefs.set(nFields);
+          nRightProject = 1;
+        }
+      }
+    }
+
+    // no need to push projections if all children fields are being
+    // referenced and there are no special preserve expressions; note
+    // that we need to do this check after we've handled the 0-column
+    // project cases
+    if (projRefs.cardinality() == nChildFields
+        && childPreserveExprs.size() == 0
+        && rightPreserveExprs.size() == 0) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Creates a projection based on the inputs specified in a bitmap and the
+   * expressions that need to be preserved. The expressions are appended after
+   * the input references.
+   *
+   * @param projChild child that the projection will be created on top of
+   * @param adjust    if true, need to create new projection expressions;
+   *                  otherwise, the existing ones are reused
+   * @param rightSide if true, creating a projection for the right hand side
+   *                  of a join
+   * @return created projection
+   */
+  public Project createProjectRefsAndExprs(
+      RelNode projChild,
+      boolean adjust,
+      boolean rightSide) {
+    List<RexNode> preserveExprs;
+    int nInputRefs;
+    int offset;
+
+    if (rightSide) {
+      preserveExprs = rightPreserveExprs;
+      nInputRefs = nRightProject;
+      offset = nSysFields + nFields;
+    } else {
+      preserveExprs = childPreserveExprs;
+      nInputRefs = nProject;
+      offset = nSysFields;
+    }
+    int refIdx = offset - 1;
+    List<Pair<RexNode, String>> newProjects =
+        new ArrayList<Pair<RexNode, String>>();
+    List<RelDataTypeField> destFields =
+        projChild.getRowType().getFieldList();
+
+    // add on the input references
+    for (int i = 0; i < nInputRefs; i++) {
+      refIdx = projRefs.nextSetBit(refIdx + 1);
+      assert refIdx >= 0;
+      final RelDataTypeField destField = destFields.get(refIdx - offset);
+      newProjects.add(
+          Pair.of(
+              (RexNode) rexBuilder.makeInputRef(
+                  destField.getType(), refIdx - offset),
+              destField.getName()));
+    }
+
+    // add on the expressions that need to be preserved, converting the
+    // arguments to reference the projected columns (if necessary)
+    int[] adjustments = {};
+    if ((preserveExprs.size() > 0) && adjust) {
+      adjustments = new int[childFields.size()];
+      for (int idx = offset; idx < childFields.size(); idx++) {
+        adjustments[idx] = -offset;
+      }
+    }
+    for (RexNode projExpr : preserveExprs) {
+      RexNode newExpr;
+      if (adjust) {
+        newExpr =
+            projExpr.accept(
+                new RelOptUtil.RexInputConverter(
+                    rexBuilder,
+                    childFields,
+                    destFields,
+                    adjustments));
+      } else {
+        newExpr = projExpr;
+      }
+      newProjects.add(
+		 Pair.of(
+              newExpr,
+              null));
+    }
+
+    return (Project) RelOptUtil.createProject(
+        projChild,
+        Pair.left(newProjects),
+        Pair.right(newProjects),
+        false,
+        relBuilder);
+  }
+
+  /**
+   * Determines how much each input reference needs to be adjusted as a result
+   * of projection
+   *
+   * @return array indicating how much each input needs to be adjusted by
+   */
+  public int[] getAdjustments() {
+    int[] adjustments = new int[nChildFields];
+    int newIdx = 0;
+    int rightOffset = childPreserveExprs.size();
+    for (int pos : BitSets.toIter(projRefs)) {
+      adjustments[pos] = -(pos - newIdx);
+      if (pos >= nSysFields + nFields) {
+        adjustments[pos] += rightOffset;
+      }
+      newIdx++;
+    }
+    return adjustments;
+  }
+
+  /**
+   * Clones an expression tree and walks through it, adjusting each
+   * RexInputRef index by some amount, and converting expressions that need to
+   * be preserved to field references.
+   *
+   * @param rex         the expression
+   * @param destFields  fields that the new expressions will be referencing
+   * @param adjustments the amount each input reference index needs to be
+   *                    adjusted by
+   * @return modified expression tree
+   */
+  public RexNode convertRefsAndExprs(
+      RexNode rex,
+      List<RelDataTypeField> destFields,
+      int[] adjustments) {
+    return rex.accept(
+        new RefAndExprConverter(
+            rexBuilder,
+            childFields,
+            destFields,
+            adjustments,
+            childPreserveExprs,
+            nProject,
+            rightPreserveExprs,
+            nProject + childPreserveExprs.size() + nRightProject));
+  }
+
+  /**
+   * Creates a new projection based on the original projection, adjusting all
+   * input refs using an adjustment array passed in. If there was no original
+   * projection, create a new one that selects every field from the underlying
+   * rel.
+   *
+   * <p>If the resulting projection would be trivial, return the child.
+   *
+   * @param projChild   child of the new project
+   * @param adjustments array indicating how much each input reference should
+   *                    be adjusted by
+   * @return the created projection
+   */
+  public RelNode createNewProject(RelNode projChild, int[] adjustments) {
+    final List<Pair<RexNode, String>> projects = Lists.newArrayList();
+
+    if (origProj != null) {
+      for (Pair<RexNode, String> p : origProj.getNamedProjects()) {
+        projects.add(
+            Pair.of(
+                convertRefsAndExprs(
+                    p.left,
+                    projChild.getRowType().getFieldList(),
+                    adjustments),
+                p.right));
+      }
+    } else {
+      for (Ord<RelDataTypeField> field : Ord.zip(childFields)) {
+        projects.add(
+            Pair.of(
+                (RexNode) rexBuilder.makeInputRef(
+                    field.e.getType(), field.i), field.e.getName()));
+      }
+    }
+    return RelOptUtil.createProject(
+        projChild,
+        Pair.left(projects),
+        Pair.right(projects),
+        true /* optimize to avoid trivial projections, as per javadoc */,
+        relBuilder);
+  }
+
+  //~ Inner Classes ----------------------------------------------------------
+
+  /**
+   * Visitor which builds a bitmap of the inputs used by an expressions, as
+   * well as locating expressions corresponding to special operators.
+   */
+  private class InputSpecialOpFinder extends RexVisitorImpl<Void> {
+    private final BitSet rexRefs;
+    private final ImmutableBitSet leftFields;
+    private final ImmutableBitSet rightFields;
+    private final ImmutableBitSet strongFields;
+    private final ExprCondition preserveExprCondition;
+    private final List<RexNode> preserveLeft;
+    private final List<RexNode> preserveRight;
+    private final Strong strong;
+
+    public InputSpecialOpFinder(
+        BitSet rexRefs,
+        ImmutableBitSet leftFields,
+        ImmutableBitSet rightFields,
+        final ImmutableBitSet strongFields,
+        ExprCondition preserveExprCondition,
+        List<RexNode> preserveLeft,
+        List<RexNode> preserveRight) {
+      super(true);
+      this.rexRefs = rexRefs;
+      this.leftFields = leftFields;
+      this.rightFields = rightFields;
+      this.preserveExprCondition = preserveExprCondition;
+      this.preserveLeft = preserveLeft;
+      this.preserveRight = preserveRight;
+
+      this.strongFields = strongFields;
+      this.strong = Strong.of(strongFields);
+    }
+
+    public Void visitCall(RexCall call) {
+      if (preserve(call)) {
+        return null;
+      }
+      super.visitCall(call);
+      return null;
+    }
+
+    private boolean isStrong(final ImmutableBitSet exprArgs, final RexNode call) {
+      // If the expressions do not use any of the inputs that require output to be null,
+      // no need to check.  Otherwise, check that the expression is null.
+      // For example, in an "left outer join", we don't require that expressions
+      // pushed down into the left input to be strong.  On the other hand,
+      // expressions pushed into the right input must be.  In that case,
+      // strongFields == right input fields.
+      return !strongFields.intersects(exprArgs) || strong.isNull(call);
+    }
+
+    private boolean preserve(RexNode call) {
+      if (preserveExprCondition.test(call)) {
+        // if the arguments of the expression only reference the
+        // left hand side, preserve it on the left; similarly, if
+        // it only references expressions on the right
+        final ImmutableBitSet exprArgs = RelOptUtil.InputFinder.bits(call);
+        if (exprArgs.cardinality() > 0) {
+          if (leftFields.contains(exprArgs) && isStrong(exprArgs, call)) {
+            addExpr(preserveLeft, call);
+            return true;
+          } else if (rightFields.contains(exprArgs) && isStrong(exprArgs, call)) {
+            assert preserveRight != null;
+            addExpr(preserveRight, call);
+            return true;
+          }
+        }
+        // if the expression arguments reference both the left and
+        // right, fall through and don't attempt to preserve the
+        // expression, but instead locate references and special
+        // ops in the call operands
+      }
+      return false;
+    }
+
+    public Void visitInputRef(RexInputRef inputRef) {
+      rexRefs.set(inputRef.getIndex());
+      return null;
+    }
+
+    /**
+     * Adds an expression to a list if the same expression isn't already in
+     * the list. Expressions are identical if their digests are the same.
+     *
+     * @param exprList current list of expressions
+     * @param newExpr  new expression to be added
+     */
+    private void addExpr(List<RexNode> exprList, RexNode newExpr) {
+      String newExprString = newExpr.toString();
+      for (RexNode expr : exprList) {
+        if (newExprString.compareTo(expr.toString()) == 0) {
+          return;
+        }
+      }
+      exprList.add(newExpr);
+    }
+  }
+
+  /**
+   * Walks an expression tree, replacing input refs with new values to reflect
+   * projection and converting special expressions to field references.
+   */
+  private class RefAndExprConverter extends RelOptUtil.RexInputConverter {
+    private final List<RexNode> preserveLeft;
+    private final int firstLeftRef;
+    private final List<RexNode> preserveRight;
+    private final int firstRightRef;
+
+    public RefAndExprConverter(
+        RexBuilder rexBuilder,
+        List<RelDataTypeField> srcFields,
+        List<RelDataTypeField> destFields,
+        int[] adjustments,
+        List<RexNode> preserveLeft,
+        int firstLeftRef,
+        List<RexNode> preserveRight,
+        int firstRightRef) {
+      super(rexBuilder, srcFields, destFields, adjustments);
+      this.preserveLeft = preserveLeft;
+      this.firstLeftRef = firstLeftRef;
+      this.preserveRight = preserveRight;
+      this.firstRightRef = firstRightRef;
+    }
+
+    public RexNode visitCall(RexCall call) {
+      // if the expression corresponds to one that needs to be preserved,
+      // convert it to a field reference; otherwise, convert the entire
+      // expression
+      int match =
+          findExprInLists(
+              call,
+              preserveLeft,
+              firstLeftRef,
+              preserveRight,
+              firstRightRef);
+      if (match >= 0) {
+        return rexBuilder.makeInputRef(
+            destFields.get(match).getType(),
+            match);
+      }
+      return super.visitCall(call);
+    }
+
+    /**
+     * Looks for a matching RexNode from among two lists of RexNodes and
+     * returns the offset into the list corresponding to the match, adjusted
+     * by an amount, depending on whether the match was from the first or
+     * second list.
+     *
+     * @param rex      RexNode that is being matched against
+     * @param rexList1 first list of RexNodes
+     * @param adjust1  adjustment if match occurred in first list
+     * @param rexList2 second list of RexNodes
+     * @param adjust2  adjustment if match occurred in the second list
+     * @return index in the list corresponding to the matching RexNode; -1
+     * if no match
+     */
+    private int findExprInLists(
+        RexNode rex,
+        List<RexNode> rexList1,
+        int adjust1,
+        List<RexNode> rexList2,
+        int adjust2) {
+      int match = findExprInList(rex, rexList1);
+      if (match >= 0) {
+        return match + adjust1;
+      }
+
+      if (rexList2 != null) {
+        match = findExprInList(rex, rexList2);
+        if (match >= 0) {
+          return match + adjust2;
+        }
+      }
+
+      return -1;
+    }
+
+    private int findExprInList(RexNode rex, List<RexNode> rexList) {
+      int match = 0;
+      for (RexNode rexElement : rexList) {
+        if (rexElement.toString().compareTo(rex.toString()) == 0) {
+          return match;
+        }
+        match++;
+      }
+      return -1;
+    }
+  }
+
+  /**
+   * A functor that replies true or false for a given expression.
+   *
+   * @see org.apache.calcite.rel.rules.PushProjector.OperatorExprCondition
+   */
+  public interface ExprCondition extends Predicate<RexNode> {
+    /**
+     * Evaluates a condition for a given expression.
+     *
+     * @param expr Expression
+     * @return result of evaluating the condition
+     */
+    boolean test(RexNode expr);
+
+    /**
+     * Constant condition that replies {@code false} for all expressions.
+     */
+    ExprCondition FALSE =
+        new ExprConditionImpl() {
+          @Override public boolean test(RexNode expr) {
+            return false;
+          }
+        };
+
+    /**
+     * Constant condition that replies {@code true} for all expressions.
+     */
+    ExprCondition TRUE =
+        new ExprConditionImpl() {
+          @Override public boolean test(RexNode expr) {
+            return true;
+          }
+        };
+  }
+
+  /** Implementation of {@link ExprCondition}. */
+  abstract static class ExprConditionImpl extends PredicateImpl<RexNode>
+      implements ExprCondition {
+  }
+
+  /**
+   * An expression condition that evaluates to true if the expression is
+   * a call to one of a set of operators.
+   */
+  class OperatorExprCondition extends ExprConditionImpl {
+    private final Set<SqlOperator> operatorSet;
+
+    /**
+     * Creates an OperatorExprCondition.
+     *
+     * @param operatorSet Set of operators
+     */
+    public OperatorExprCondition(Iterable<? extends SqlOperator> operatorSet) {
+      this.operatorSet = ImmutableSet.copyOf(operatorSet);
+    }
+
+    public boolean test(RexNode expr) {
+      return expr instanceof RexCall
+          && operatorSet.contains(((RexCall) expr).getOperator());
+    }
+  }
+}
+
+// End PushProjector.java

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
deleted file mode 100644
index a57cf10..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-/*
- * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL CALCITE-1761 IS FIXED.
- */
-
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlOperandTypeChecker;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-/**
- * SQL function that computes keys by which rows can be partitioned and
- * aggregated.
- *
- * <p>Grouped window functions always occur in the GROUP BY clause. They often
- * have auxiliary functions that access information about the group. For
- * example, {@code HOP} is a group function, and its auxiliary functions are
- * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming
- * query:
- *
- * <blockquote><pre>
- * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR),
- *   HOP_END(rowtime, INTERVAL '1' HOUR),
- *   MIN(unitPrice)
- * FROM Orders
- * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
- * </pre></blockquote>
- */
-class SqlGroupFunction extends SqlFunction {
-	/** The grouped function, if this an auxiliary function; null otherwise. */
-	final SqlGroupFunction groupFunction;
-
-	/** Creates a SqlGroupFunction.
-	 *
-	 * @param kind Kind; also determines function name
-	 * @param groupFunction Group function, if this is an auxiliary;
-	 *                      null, if this is a group function
-	 * @param operandTypeChecker Operand type checker
-	 */
-	SqlGroupFunction(SqlKind kind, SqlGroupFunction groupFunction,
-		SqlOperandTypeChecker operandTypeChecker) {
-		super(kind.name(), kind, ReturnTypes.ARG0, null,
-			operandTypeChecker, SqlFunctionCategory.SYSTEM);
-		this.groupFunction = groupFunction;
-		if (groupFunction != null) {
-			assert groupFunction.groupFunction == null;
-		}
-	}
-
-	/** Creates an auxiliary function from this grouped window function. */
-	SqlGroupFunction auxiliary(SqlKind kind) {
-		return new SqlGroupFunction(kind, this, getOperandTypeChecker());
-	}
-
-	/** Returns a list of this grouped window function's auxiliary functions. */
-	List<SqlGroupFunction> getAuxiliaryFunctions() {
-		return ImmutableList.of();
-	}
-
-	@Override public boolean isGroup() {
-		// Auxiliary functions are not group functions
-		return groupFunction == null;
-	}
-
-	@Override public boolean isGroupAuxiliary() {
-		return groupFunction != null;
-	}
-
-	@Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
-		// Monotonic iff its first argument is, but not strict.
-		//
-		// Note: This strategy happens to works for all current group functions
-		// (HOP, TUMBLE, SESSION). When there are exceptions to this rule, we'll
-		// make the method abstract.
-		return call.getOperandMonotonicity(0).unstrict();
-	}
-}
-
-// End SqlGroupFunction.java