You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/08/08 10:58:31 UTC
[5/5] flink git commit: [FLINK-6429] [table] Bump Calcite version to
1.13.
[FLINK-6429] [table] Bump Calcite version to 1.13.
This closes #4373.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5770fe8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5770fe8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5770fe8
Branch: refs/heads/master
Commit: d5770fe8dd1486d457c87c17a7df8dba276e9bcd
Parents: f471888
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Jul 19 14:34:37 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 8 12:56:34 2017 +0200
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 2 +-
.../calcite/avatica/util/DateTimeUtils.java | 1044 ++++
.../apache/calcite/rel/rules/PushProjector.java | 868 +++
.../calcite/sql/fun/SqlGroupFunction.java | 103 -
.../calcite/sql/fun/SqlStdOperatorTable.java | 2133 -------
.../apache/calcite/sql/validate/AggChecker.java | 225 -
.../sql/validate/SqlUserDefinedAggFunction.java | 82 -
.../calcite/sql2rel/SqlToRelConverter.java | 5356 ------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 2 +-
.../flink/table/calcite/FlinkTypeSystem.scala | 2 +-
.../table/catalog/ExternalCatalogSchema.scala | 2 +
.../flink/table/codegen/CodeGenerator.scala | 3 +
.../apache/flink/table/expressions/call.scala | 1 +
.../apache/flink/table/expressions/time.scala | 44 +-
.../table/functions/utils/AggSqlFunction.scala | 1 +
.../flink/table/plan/rules/FlinkRuleSets.scala | 4 +-
.../flink/table/plan/stats/FlinkStatistic.scala | 5 +-
.../table/plan/util/RexProgramExtractor.scala | 3 +
.../table/api/batch/sql/CorrelateTest.scala | 24 +-
.../table/api/batch/table/CorrelateTest.scala | 6 +-
.../table/api/stream/sql/CorrelateTest.scala | 24 +-
.../table/api/stream/table/CorrelateTest.scala | 24 +-
.../table/expressions/ScalarFunctionsTest.scala | 5 +-
.../plan/TimeIndicatorConversionTest.scala | 2 +-
24 files changed, 2002 insertions(+), 7963 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 8a7e3ac..0e943ad 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -52,7 +52,7 @@ under the License.
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
- <version>1.12.0</version>
+ <version>1.13.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
new file mode 100644
index 0000000..d1a87a7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import java.text.DateFormat;
+import java.text.NumberFormat;
+import java.text.ParsePosition;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL CALCITE-1884 IS FIXED.
+ */
+
+/**
+ * Utility functions for datetime types: date, time, timestamp.
+ *
+ * <p>Used by the JDBC driver.
+ *
+ * <p>TODO: review methods for performance. Due to allocations required, it may
+ * be preferable to introduce a "formatter" with the required state.
+ */
+public class DateTimeUtils {
+ /** The julian date of the epoch, 1970-01-01. */
+ public static final int EPOCH_JULIAN = 2440588;
+
+ private DateTimeUtils() {}
+
+ //~ Static fields/initializers ---------------------------------------------
+
+ /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+ public static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+ /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+ public static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+ /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
+ public static final String TIMESTAMP_FORMAT_STRING =
+ DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
+ /** The GMT time zone.
+ *
+ * @deprecated Use {@link #UTC_ZONE} */
+ @Deprecated // to be removed before 2.0
+ public static final TimeZone GMT_ZONE = TimeZone.getTimeZone("GMT");
+
+ /** The UTC time zone. */
+ public static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");
+
+ /** The Java default time zone. */
+ public static final TimeZone DEFAULT_ZONE = TimeZone.getDefault();
+
+ /**
+ * The number of milliseconds in a second.
+ */
+ public static final long MILLIS_PER_SECOND = 1000L;
+
+ /**
+ * The number of milliseconds in a minute.
+ */
+ public static final long MILLIS_PER_MINUTE = 60000L;
+
+ /**
+ * The number of milliseconds in an hour.
+ */
+ public static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
+
+ /**
+ * The number of milliseconds in a day.
+ *
+ * <p>This is the modulo 'mask' used when converting
+ * TIMESTAMP values to DATE and TIME values.
+ */
+ public static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
+
+ /**
+ * Calendar set to the epoch (1970-01-01 00:00:00 UTC). Useful for
+ * initializing other values. Calendars are not immutable, so be careful not
+ * to screw up this object for everyone else.
+ */
+ public static final Calendar ZERO_CALENDAR;
+
+ static {
+ ZERO_CALENDAR = Calendar.getInstance(DateTimeUtils.UTC_ZONE, Locale.ROOT);
+ ZERO_CALENDAR.setTimeInMillis(0);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ /**
+ * Parses a string using {@link SimpleDateFormat} and a given pattern. This
+ * method parses a string at the specified parse position and if successful,
+ * updates the parse position to the index after the last character used.
+ * The parsing is strict and requires months to be less than 12, days to be
+ * less than 31, etc.
+ *
+ * @param s string to be parsed
+ * @param dateFormat Date format
+ * @param tz time zone in which to interpret string. Defaults to the Java
+ * default time zone
+ * @param pp position to start parsing from
+ * @return a Calendar initialized with the parsed value, or null if parsing
+ * failed. If returned, the Calendar is configured to the GMT time zone.
+ */
+ private static Calendar parseDateFormat(String s, DateFormat dateFormat,
+ TimeZone tz, ParsePosition pp) {
+ if (tz == null) {
+ tz = DEFAULT_ZONE;
+ }
+ Calendar ret = Calendar.getInstance(tz, Locale.ROOT);
+ dateFormat.setCalendar(ret);
+ dateFormat.setLenient(false);
+
+ final Date d = dateFormat.parse(s, pp);
+ if (null == d) {
+ return null;
+ }
+ ret.setTime(d);
+ ret.setTimeZone(UTC_ZONE);
+ return ret;
+ }
+
+ @Deprecated // to be removed before 2.0
+ public static Calendar parseDateFormat(String s, String pattern,
+ TimeZone tz) {
+ return parseDateFormat(s, new SimpleDateFormat(pattern, Locale.ROOT), tz);
+ }
+
+ /**
+ * Parses a string using {@link SimpleDateFormat} and a given pattern. The
+ * entire string must match the pattern specified.
+ *
+ * @param s string to be parsed
+ * @param dateFormat Date format
+ * @param tz time zone in which to interpret string. Defaults to the Java
+ * default time zone
+ * @return a Calendar initialized with the parsed value, or null if parsing
+ * failed. If returned, the Calendar is configured to the UTC time zone.
+ */
+ public static Calendar parseDateFormat(String s, DateFormat dateFormat,
+ TimeZone tz) {
+ ParsePosition pp = new ParsePosition(0);
+ Calendar ret = parseDateFormat(s, dateFormat, tz, pp);
+ if (pp.getIndex() != s.length()) {
+ // Didn't consume entire string - not good
+ return null;
+ }
+ return ret;
+ }
+
+ @Deprecated // to be removed before 2.0
+ public static PrecisionTime parsePrecisionDateTimeLiteral(
+ String s,
+ String pattern,
+ TimeZone tz) {
+ assert pattern != null;
+ return parsePrecisionDateTimeLiteral(s,
+ new SimpleDateFormat(pattern, Locale.ROOT), tz, 3);
+ }
+
+ /**
+ * Parses a string using {@link SimpleDateFormat} and a given pattern, and
+ * if present, parses a fractional seconds component. The fractional seconds
+ * component must begin with a decimal point ('.') followed by numeric
+ * digits. The precision is rounded to a maximum of 3 digits of fractional
+ * seconds precision (to obtain milliseconds).
+ *
+ * @param s string to be parsed
+ * @param dateFormat Date format
+ * @param tz time zone in which to interpret string. Defaults to the
+ * local time zone
+ * @return a {@link DateTimeUtils.PrecisionTime PrecisionTime} initialized
+ * with the parsed value, or null if parsing failed. The PrecisionTime
+ * contains a GMT Calendar and a precision.
+ */
+ public static PrecisionTime parsePrecisionDateTimeLiteral(String s,
+ DateFormat dateFormat, TimeZone tz, int maxPrecision) {
+ final ParsePosition pp = new ParsePosition(0);
+ final Calendar cal = parseDateFormat(s, dateFormat, tz, pp);
+ if (cal == null) {
+ return null; // Invalid date/time format
+ }
+
+ // Note: the Java SimpleDateFormat 'S' treats any number after
+ // the decimal as milliseconds. That means 12:00:00.9 has 9
+ // milliseconds and 12:00:00.9999 has 9999 milliseconds.
+ int p = 0;
+ String secFraction = "";
+ if (pp.getIndex() < s.length()) {
+ // Check to see if rest is decimal portion
+ if (s.charAt(pp.getIndex()) != '.') {
+ return null;
+ }
+
+ // Skip decimal sign
+ pp.setIndex(pp.getIndex() + 1);
+
+ // Parse decimal portion
+ if (pp.getIndex() < s.length()) {
+ secFraction = s.substring(pp.getIndex());
+ if (!secFraction.matches("\\d+")) {
+ return null;
+ }
+ NumberFormat nf = NumberFormat.getIntegerInstance(Locale.ROOT);
+ Number num = nf.parse(s, pp);
+ if ((num == null) || (pp.getIndex() != s.length())) {
+ // Invalid decimal portion
+ return null;
+ }
+
+ // Determine precision - only support prec 3 or lower
+ // (milliseconds) Higher precisions are quietly rounded away
+ p = secFraction.length();
+ if (maxPrecision >= 0) {
+ // If there is a maximum precision, ignore subsequent digits
+ p = Math.min(maxPrecision, p);
+ secFraction = secFraction.substring(0, p);
+ }
+
+ // Calculate milliseconds
+ String millis = secFraction;
+ if (millis.length() > 3) {
+ millis = secFraction.substring(0, 3);
+ }
+ while (millis.length() < 3) {
+ millis = millis + "0";
+ }
+
+ int ms = Integer.valueOf(millis);
+ cal.add(Calendar.MILLISECOND, ms);
+ }
+ }
+
+ assert pp.getIndex() == s.length();
+ return new PrecisionTime(cal, secFraction, p);
+ }
+
+ /**
+ * Gets the active time zone based on a Calendar argument
+ */
+ public static TimeZone getTimeZone(Calendar cal) {
+ if (cal == null) {
+ return DEFAULT_ZONE;
+ }
+ return cal.getTimeZone();
+ }
+
+ /**
+ * Checks if the date/time format is valid
+ *
+ * @param pattern {@link SimpleDateFormat} pattern
+ * @throws IllegalArgumentException if the given pattern is invalid
+ */
+ public static void checkDateFormat(String pattern) {
+ new SimpleDateFormat(pattern, Locale.ROOT);
+ }
+
+ /**
+ * Creates a new date formatter with Farrago specific options. Farrago
+ * parsing is strict and does not allow values such as day 0, month 13, etc.
+ *
+ * @param format {@link SimpleDateFormat} pattern
+ */
+ public static SimpleDateFormat newDateFormat(String format) {
+ SimpleDateFormat sdf = new SimpleDateFormat(format, Locale.ROOT);
+ sdf.setLenient(false);
+ return sdf;
+ }
+
+ /** Helper for CAST({timestamp} AS VARCHAR(n)). */
+ public static String unixTimestampToString(long timestamp) {
+ return unixTimestampToString(timestamp, 0);
+ }
+
+ public static String unixTimestampToString(long timestamp, int precision) {
+ final StringBuilder buf = new StringBuilder(17);
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ int time = (int) (timestamp % MILLIS_PER_DAY);
+ if (time < 0) {
+ --date;
+ time += MILLIS_PER_DAY;
+ }
+ unixDateToString(buf, date);
+ buf.append(' ');
+ unixTimeToString(buf, time, precision);
+ return buf.toString();
+ }
+
+ /** Helper for CAST({timestamp} AS VARCHAR(n)). */
+ public static String unixTimeToString(int time) {
+ return unixTimeToString(time, 0);
+ }
+
+ public static String unixTimeToString(int time, int precision) {
+ final StringBuilder buf = new StringBuilder(8);
+ unixTimeToString(buf, time, precision);
+ return buf.toString();
+ }
+
+ private static void unixTimeToString(StringBuilder buf, int time,
+ int precision) {
+ int h = time / 3600000;
+ int time2 = time % 3600000;
+ int m = time2 / 60000;
+ int time3 = time2 % 60000;
+ int s = time3 / 1000;
+ int ms = time3 % 1000;
+ int2(buf, h);
+ buf.append(':');
+ int2(buf, m);
+ buf.append(':');
+ int2(buf, s);
+ if (precision > 0) {
+ buf.append('.');
+ while (precision > 0) {
+ buf.append((char) ('0' + (ms / 100)));
+ ms = ms % 100;
+ ms = ms * 10;
+ --precision;
+ }
+ }
+ }
+
+ private static void int2(StringBuilder buf, int i) {
+ buf.append((char) ('0' + (i / 10) % 10));
+ buf.append((char) ('0' + i % 10));
+ }
+
+ private static void int4(StringBuilder buf, int i) {
+ buf.append((char) ('0' + (i / 1000) % 10));
+ buf.append((char) ('0' + (i / 100) % 10));
+ buf.append((char) ('0' + (i / 10) % 10));
+ buf.append((char) ('0' + i % 10));
+ }
+
+ /** Helper for CAST({date} AS VARCHAR(n)). */
+ public static String unixDateToString(int date) {
+ final StringBuilder buf = new StringBuilder(10);
+ unixDateToString(buf, date);
+ return buf.toString();
+ }
+
+ private static void unixDateToString(StringBuilder buf, int date) {
+ julianToString(buf, date + EPOCH_JULIAN);
+ }
+
+ private static void julianToString(StringBuilder buf, int julian) {
+ // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998
+ int b, c;
+ if (julian > 2299160) {
+ int a = julian + 32044;
+ b = (4 * a + 3) / 146097;
+ c = a - b *146097 / 4;
+ } else {
+ b = 0;
+ c = julian + 32082;
+ }
+ int d = (4 * c + 3) / 1461;
+ int e = c - (1461 * d) / 4;
+ int m = (5 * e + 2) / 153;
+ int day = e - (153 * m + 2) / 5 + 1;
+ int month = m + 3 - 12 * (m / 10);
+ int year = b * 100 + d - 4800 + (m / 10);
+
+ int4(buf, year);
+ buf.append('-');
+ int2(buf, month);
+ buf.append('-');
+ int2(buf, day);
+ }
+
+ public static String intervalYearMonthToString(int v, TimeUnitRange range) {
+ final StringBuilder buf = new StringBuilder();
+ if (v >= 0) {
+ buf.append('+');
+ } else {
+ buf.append('-');
+ v = -v;
+ }
+ final int y;
+ final int m;
+ switch (range) {
+ case YEAR:
+ v = roundUp(v, 12);
+ y = v / 12;
+ buf.append(y);
+ break;
+ case YEAR_TO_MONTH:
+ y = v / 12;
+ buf.append(y);
+ buf.append('-');
+ m = v % 12;
+ number(buf, m, 2);
+ break;
+ case MONTH:
+ m = v;
+ buf.append(m);
+ break;
+ default:
+ throw new AssertionError(range);
+ }
+ return buf.toString();
+ }
+
+ public static StringBuilder number(StringBuilder buf, int v, int n) {
+ for (int k = digitCount(v); k < n; k++) {
+ buf.append('0');
+ }
+ return buf.append(v);
+ }
+
+ public static int digitCount(int v) {
+ for (int n = 1;; n++) {
+ v /= 10;
+ if (v == 0) {
+ return n;
+ }
+ }
+ }
+
+ private static int roundUp(int dividend, int divisor) {
+ int remainder = dividend % divisor;
+ dividend -= remainder;
+ if (remainder * 2 > divisor) {
+ dividend += divisor;
+ }
+ return dividend;
+ }
+
+ /** Cheap, unsafe, long power. power(2, 3) returns 8. */
+ public static long powerX(long a, long b) {
+ long x = 1;
+ while (b > 0) {
+ x *= a;
+ --b;
+ }
+ return x;
+ }
+
+ public static String intervalDayTimeToString(long v, TimeUnitRange range,
+ int scale) {
+ final StringBuilder buf = new StringBuilder();
+ if (v >= 0) {
+ buf.append('+');
+ } else {
+ buf.append('-');
+ v = -v;
+ }
+ final long ms;
+ final long s;
+ final long m;
+ final long h;
+ final long d;
+ switch (range) {
+ case DAY_TO_SECOND:
+ v = roundUp(v, powerX(10, 3 - scale));
+ ms = v % 1000;
+ v /= 1000;
+ s = v % 60;
+ v /= 60;
+ m = v % 60;
+ v /= 60;
+ h = v % 24;
+ v /= 24;
+ d = v;
+ buf.append((int) d);
+ buf.append(' ');
+ number(buf, (int) h, 2);
+ buf.append(':');
+ number(buf, (int) m, 2);
+ buf.append(':');
+ number(buf, (int) s, 2);
+ fraction(buf, scale, ms);
+ break;
+ case DAY_TO_MINUTE:
+ v = roundUp(v, 1000 * 60);
+ v /= 1000;
+ v /= 60;
+ m = v % 60;
+ v /= 60;
+ h = v % 24;
+ v /= 24;
+ d = v;
+ buf.append((int) d);
+ buf.append(' ');
+ number(buf, (int) h, 2);
+ buf.append(':');
+ number(buf, (int) m, 2);
+ break;
+ case DAY_TO_HOUR:
+ v = roundUp(v, 1000 * 60 * 60);
+ v /= 1000;
+ v /= 60;
+ v /= 60;
+ h = v % 24;
+ v /= 24;
+ d = v;
+ buf.append((int) d);
+ buf.append(' ');
+ number(buf, (int) h, 2);
+ break;
+ case DAY:
+ v = roundUp(v, 1000 * 60 * 60 * 24);
+ d = v / (1000 * 60 * 60 * 24);
+ buf.append((int) d);
+ break;
+ case HOUR:
+ v = roundUp(v, 1000 * 60 * 60);
+ v /= 1000;
+ v /= 60;
+ v /= 60;
+ h = v;
+ buf.append((int) h);
+ break;
+ case HOUR_TO_MINUTE:
+ v = roundUp(v, 1000 * 60);
+ v /= 1000;
+ v /= 60;
+ m = v % 60;
+ v /= 60;
+ h = v;
+ buf.append((int) h);
+ buf.append(':');
+ number(buf, (int) m, 2);
+ break;
+ case HOUR_TO_SECOND:
+ v = roundUp(v, powerX(10, 3 - scale));
+ ms = v % 1000;
+ v /= 1000;
+ s = v % 60;
+ v /= 60;
+ m = v % 60;
+ v /= 60;
+ h = v;
+ buf.append((int) h);
+ buf.append(':');
+ number(buf, (int) m, 2);
+ buf.append(':');
+ number(buf, (int) s, 2);
+ fraction(buf, scale, ms);
+ break;
+ case MINUTE_TO_SECOND:
+ v = roundUp(v, powerX(10, 3 - scale));
+ ms = v % 1000;
+ v /= 1000;
+ s = v % 60;
+ v /= 60;
+ m = v;
+ buf.append((int) m);
+ buf.append(':');
+ number(buf, (int) s, 2);
+ fraction(buf, scale, ms);
+ break;
+ case MINUTE:
+ v = roundUp(v, 1000 * 60);
+ v /= 1000;
+ v /= 60;
+ m = v;
+ buf.append((int) m);
+ break;
+ case SECOND:
+ v = roundUp(v, powerX(10, 3 - scale));
+ ms = v % 1000;
+ v /= 1000;
+ s = v;
+ buf.append((int) s);
+ fraction(buf, scale, ms);
+ break;
+ default:
+ throw new AssertionError(range);
+ }
+ return buf.toString();
+ }
+
+ /**
+ * Rounds a dividend to the nearest divisor.
+ * For example roundUp(31, 10) yields 30; roundUp(37, 10) yields 40.
+ * @param dividend Number to be divided
+ * @param divisor Number to divide by
+ * @return Rounded dividend
+ */
+ private static long roundUp(long dividend, long divisor) {
+ long remainder = dividend % divisor;
+ dividend -= remainder;
+ if (remainder * 2 > divisor) {
+ dividend += divisor;
+ }
+ return dividend;
+ }
+
+ private static void fraction(StringBuilder buf, int scale, long ms) {
+ if (scale > 0) {
+ buf.append('.');
+ long v1 = scale == 3 ? ms
+ : scale == 2 ? ms / 10
+ : scale == 1 ? ms / 100
+ : 0;
+ number(buf, (int) v1, scale);
+ }
+ }
+
+ public static int dateStringToUnixDate(String s) {
+ int hyphen1 = s.indexOf('-');
+ int y;
+ int m;
+ int d;
+ if (hyphen1 < 0) {
+ y = Integer.parseInt(s.trim());
+ m = 1;
+ d = 1;
+ } else {
+ y = Integer.parseInt(s.substring(0, hyphen1).trim());
+ final int hyphen2 = s.indexOf('-', hyphen1 + 1);
+ if (hyphen2 < 0) {
+ m = Integer.parseInt(s.substring(hyphen1 + 1).trim());
+ d = 1;
+ } else {
+ m = Integer.parseInt(s.substring(hyphen1 + 1, hyphen2).trim());
+ d = Integer.parseInt(s.substring(hyphen2 + 1).trim());
+ }
+ }
+ return ymdToUnixDate(y, m, d);
+ }
+
+ public static int timeStringToUnixDate(String v) {
+ return timeStringToUnixDate(v, 0);
+ }
+
+ public static int timeStringToUnixDate(String v, int start) {
+ final int colon1 = v.indexOf(':', start);
+ int hour;
+ int minute;
+ int second;
+ int milli;
+ if (colon1 < 0) {
+ hour = Integer.parseInt(v.trim());
+ minute = 1;
+ second = 1;
+ milli = 0;
+ } else {
+ hour = Integer.parseInt(v.substring(start, colon1).trim());
+ final int colon2 = v.indexOf(':', colon1 + 1);
+ if (colon2 < 0) {
+ minute = Integer.parseInt(v.substring(colon1 + 1).trim());
+ second = 1;
+ milli = 0;
+ } else {
+ minute = Integer.parseInt(v.substring(colon1 + 1, colon2).trim());
+ int dot = v.indexOf('.', colon2);
+ if (dot < 0) {
+ second = Integer.parseInt(v.substring(colon2 + 1).trim());
+ milli = 0;
+ } else {
+ second = Integer.parseInt(v.substring(colon2 + 1, dot).trim());
+ milli = parseFraction(v.substring(dot + 1).trim(), 100);
+ }
+ }
+ }
+ return hour * (int) MILLIS_PER_HOUR
+ + minute * (int) MILLIS_PER_MINUTE
+ + second * (int) MILLIS_PER_SECOND
+ + milli;
+ }
+
+ /** Parses a fraction, multiplying the first character by {@code multiplier},
+ * the second character by {@code multiplier / 10},
+ * the third character by {@code multiplier / 100}, and so forth.
+ *
+ * <p>For example, {@code parseFraction("1234", 100)} yields {@code 123}. */
+ private static int parseFraction(String v, int multiplier) {
+ int r = 0;
+ for (int i = 0; i < v.length(); i++) {
+ char c = v.charAt(i);
+ int x = c < '0' || c > '9' ? 0 : (c - '0');
+ r += multiplier * x;
+ if (multiplier < 10) {
+ // We're at the last digit. Check for rounding.
+ if (i + 1 < v.length()
+ && v.charAt(i + 1) >= '5') {
+ ++r;
+ }
+ break;
+ }
+ multiplier /= 10;
+ }
+ return r;
+ }
+
+ public static long timestampStringToUnixDate(String s) {
+ final long d;
+ final long t;
+ s = s.trim();
+ int space = s.indexOf(' ');
+ if (space >= 0) {
+ d = dateStringToUnixDate(s.substring(0, space));
+ t = timeStringToUnixDate(s, space + 1);
+ } else {
+ d = dateStringToUnixDate(s);
+ t = 0;
+ }
+ return d * MILLIS_PER_DAY + t;
+ }
+
+ public static long unixDateExtract(TimeUnitRange range, long date) {
+ return julianExtract(range, (int) date + EPOCH_JULIAN);
+ }
+
+ private static int julianExtract(TimeUnitRange range, int julian) {
+ // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998
+ int b, c;
+ if (julian > 2299160) {
+ int a = julian + 32044;
+ b = (4 * a + 3) / 146097;
+ c = a - b *146097 / 4;
+ } else {
+ b = 0;
+ c = julian + 32082;
+ }
+ int d = (4 * c + 3) / 1461;
+ int e = c - (1461 * d) / 4;
+ int m = (5 * e + 2) / 153;
+ int day = e - (153 * m + 2) / 5 + 1;
+ int month = m + 3 - 12 * (m / 10);
+ int year = b * 100 + d - 4800 + (m / 10);
+
+ switch (range) {
+ case YEAR:
+ return year;
+ case QUARTER:
+ return (month + 2) / 3;
+ case MONTH:
+ return month;
+ case DAY:
+ return day;
+ case DOW:
+ return (int) floorMod(julian + 1, 7) + 1; // sun=1, sat=7
+ case WEEK:
+ long fmofw = firstMondayOfFirstWeek(year);
+ if (julian < fmofw) {
+ fmofw = firstMondayOfFirstWeek(year - 1);
+ }
+ return (int) (julian - fmofw) / 7 + 1;
+ case DOY:
+ final long janFirst = ymdToJulian(year, 1, 1);
+ return (int) (julian - janFirst) + 1;
+ case CENTURY:
+ return year > 0
+ ? (year + 99) / 100
+ : (year - 99) / 100;
+ case MILLENNIUM:
+ return year > 0
+ ? (year + 999) / 1000
+ : (year - 999) / 1000;
+ default:
+ throw new AssertionError(range);
+ }
+ }
+
+ /** Returns the first day of the first week of a year.
+ * Per ISO-8601 it is the Monday of the week that contains Jan 4,
+ * or equivalently, it is a Monday between Dec 29 and Jan 4.
+ * Sometimes it is in the year before the given year. */
+ private static long firstMondayOfFirstWeek(int year) {
+ final long janFirst = ymdToJulian(year, 1, 1);
+ final long janFirstDow = floorMod(janFirst + 1, 7); // sun=0, sat=6
+ return janFirst + (11 - janFirstDow) % 7 - 3;
+ }
+
+ /** Extracts a time unit from a UNIX date (milliseconds since epoch). */
+ public static int unixTimestampExtract(TimeUnitRange range,
+ long timestamp) {
+ return unixTimeExtract(range, (int) floorMod(timestamp, MILLIS_PER_DAY));
+ }
+
+ /** Extracts a time unit from a time value (milliseconds since midnight). */
+ public static int unixTimeExtract(TimeUnitRange range, int time) {
+ assert time >= 0;
+ assert time < MILLIS_PER_DAY;
+ switch (range) {
+ case HOUR:
+ return time / (int) MILLIS_PER_HOUR;
+ case MINUTE:
+ final int minutes = time / (int) MILLIS_PER_MINUTE;
+ return minutes % 60;
+ case SECOND:
+ final int seconds = time / (int) MILLIS_PER_SECOND;
+ return seconds % 60;
+ default:
+ throw new AssertionError(range);
+ }
+ }
+
+ /** Resets to zero the "time" part of a timestamp. */
+ public static long resetTime(long timestamp) {
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ return (long) date * MILLIS_PER_DAY;
+ }
+
+ /** Resets to epoch (1970-01-01) the "date" part of a timestamp. */
+ public static long resetDate(long timestamp) {
+ return floorMod(timestamp, MILLIS_PER_DAY);
+ }
+
+ public static long unixTimestampFloor(TimeUnitRange range, long timestamp) {
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ final int f = julianDateFloor(range, date + EPOCH_JULIAN, true);
+ return (long) f * MILLIS_PER_DAY;
+ }
+
+ public static long unixDateFloor(TimeUnitRange range, long date) {
+ return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+ }
+
+ public static long unixTimestampCeil(TimeUnitRange range, long timestamp) {
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ final int f = julianDateFloor(range, date + EPOCH_JULIAN, false);
+ return (long) f * MILLIS_PER_DAY;
+ }
+
+ public static long unixDateCeil(TimeUnitRange range, long date) {
+ return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+ }
+
+ private static int julianDateFloor(TimeUnitRange range, int julian,
+ boolean floor) {
+ // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 1998
+ int b, c;
+ if (julian > 2299160) {
+ int a = julian + 32044;
+ b = (4 * a + 3) / 146097;
+ c = a - b *146097 / 4;
+ } else {
+ b = 0;
+ c = julian + 32082;
+ }
+ int d = (4 * c + 3) / 1461;
+ int e = c - (1461 * d) / 4;
+ int m = (5 * e + 2) / 153;
+ int day = e - (153 * m + 2) / 5 + 1;
+ int month = m + 3 - 12 * (m / 10);
+ int year = b * 100 + d - 4800 + (m / 10);
+
+ switch (range) {
+ case YEAR:
+ if (!floor && (month > 1 || day > 1)) {
+ ++year;
+ }
+ return ymdToUnixDate(year, 1, 1);
+ case MONTH:
+ if (!floor && day > 1) {
+ ++month;
+ }
+ return ymdToUnixDate(year, month, 1);
+ default:
+ throw new AssertionError(range);
+ }
+ }
+
+ public static int ymdToUnixDate(int year, int month, int day) {
+ final int julian = ymdToJulian(year, month, day);
+ return julian - EPOCH_JULIAN;
+ }
+
+ public static int ymdToJulian(int year, int month, int day) {
+ int a = (14 - month) / 12;
+ int y = year + 4800 - a;
+ int m = month + 12 * a - 3;
+ int j = day + (153 * m + 2) / 5
+ + 365 * y
+ + y / 4
+ - y / 100
+ + y / 400
+ - 32045;
+ if (j < 2299161) {
+ j = day + (153 * m + 2) / 5 + 365 * y + y / 4 - 32083;
+ }
+ return j;
+ }
+
+ public static long unixTimestamp(int year, int month, int day, int hour,
+ int minute, int second) {
+ final int date = ymdToUnixDate(year, month, day);
+ return (long) date * MILLIS_PER_DAY
+ + (long) hour * MILLIS_PER_HOUR
+ + (long) minute * MILLIS_PER_MINUTE
+ + (long) second * MILLIS_PER_SECOND;
+ }
+
+ /** Adds a given number of months to a timestamp, represented as the number
+ * of milliseconds since the epoch. */
+ public static long addMonths(long timestamp, int m) {
+ final long millis =
+ DateTimeUtils.floorMod(timestamp, DateTimeUtils.MILLIS_PER_DAY);
+ timestamp -= millis;
+ final long x =
+ addMonths((int) (timestamp / DateTimeUtils.MILLIS_PER_DAY), m);
+ return x * DateTimeUtils.MILLIS_PER_DAY + millis;
+ }
+
+ /** Adds a given number of months to a date, represented as the number of
+ * days since the epoch. */
+ public static int addMonths(int date, int m) {
+ int y0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.YEAR, date);
+ int m0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.MONTH, date);
+ int d0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.DAY, date);
+ int y = m / 12;
+ y0 += y;
+ m0 += m - y * 12;
+ int last = lastDay(y0, m0);
+ if (d0 > last) {
+ d0 = 1;
+ if (++m0 > 12) {
+ m0 = 1;
+ ++y0;
+ }
+ }
+ return DateTimeUtils.ymdToUnixDate(y0, m0, d0);
+ }
+
+ private static int lastDay(int y, int m) {
+ switch (m) {
+ case 2:
+ return y % 4 == 0
+ && (y % 100 != 0
+ || y % 400 == 0)
+ ? 29 : 28;
+ case 4:
+ case 6:
+ case 9:
+ case 11:
+ return 30;
+ default:
+ return 31;
+ }
+ }
+
+ /** Finds the number of months between two dates, each represented as the
+ * number of days since the epoch. */
+ public static int subtractMonths(int date0, int date1) {
+ if (date0 < date1) {
+ return -subtractMonths(date1, date0);
+ }
+ // Start with an estimate.
+ // Since no month has more than 31 days, the estimate is <= the true value.
+ int m = (date0 - date1) / 31;
+ for (;;) {
+ int date2 = addMonths(date1, m);
+ if (date2 >= date0) {
+ return m;
+ }
+ int date3 = addMonths(date1, m + 1);
+ if (date3 > date0) {
+ return m;
+ }
+ ++m;
+ }
+ }
+
+ public static int subtractMonths(long t0, long t1) {
+ final long millis0 =
+ DateTimeUtils.floorMod(t0, DateTimeUtils.MILLIS_PER_DAY);
+ final int d0 = (int) DateTimeUtils.floorDiv(t0 - millis0,
+ DateTimeUtils.MILLIS_PER_DAY);
+ final long millis1 =
+ DateTimeUtils.floorMod(t1, DateTimeUtils.MILLIS_PER_DAY);
+ final int d1 = (int) DateTimeUtils.floorDiv(t1 - millis1,
+ DateTimeUtils.MILLIS_PER_DAY);
+ int x = subtractMonths(d0, d1);
+ final long d2 = addMonths(d1, x);
+ if (d2 == d0 && millis0 < millis1) {
+ --x;
+ }
+ return x;
+ }
+
+ /** Divide, rounding towards negative infinity. */
+ public static long floorDiv(long x, long y) {
+ long r = x / y;
+ // if the signs are different and modulo not zero, round down
+ if ((x ^ y) < 0 && (r * y != x)) {
+ r--;
+ }
+ return r;
+ }
+
+ /** Modulo, always returning a non-negative result. */
+ public static long floorMod(long x, long y) {
+ return x - floorDiv(x, y) * y;
+ }
+
+ /** Creates an instance of {@link Calendar} in the root locale and UTC time
+ * zone. */
+ public static Calendar calendar() {
+ return Calendar.getInstance(UTC_ZONE, Locale.ROOT);
+ }
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ /**
+ * Helper class for {@link DateTimeUtils#parsePrecisionDateTimeLiteral}
+ */
+ public static class PrecisionTime {
+ private final Calendar cal;
+ private final String fraction;
+ private final int precision;
+
+ public PrecisionTime(Calendar cal, String fraction, int precision) {
+ this.cal = cal;
+ this.fraction = fraction;
+ this.precision = precision;
+ }
+
+ public Calendar getCalendar() {
+ return cal;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public String getFraction() {
+ return fraction;
+ }
+ }
+}
+
+// End DateTimeUtils.java
http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
new file mode 100644
index 0000000..0955aeb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.Strong;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.runtime.PredicateImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Set;
+
+// This class is copied from Apache Calcite except that it does not
+// automatically name the field using the name of the operators
+// as the Table API rejects special characters like '-' in the field names.
+
+/**
+ * PushProjector is a utility class used to perform operations used in push
+ * projection rules.
+ *
+ * <p>Pushing is particularly interesting in the case of join, because there
+ * are multiple inputs. Generally an expression can be pushed down to a
+ * particular input if it depends upon no other inputs. If it can be pushed
+ * down to both sides, it is pushed down to the left.
+ *
+ * <p>Sometimes an expression needs to be split before it can be pushed down.
+ * To flag that an expression cannot be split, specify a rule that it must be
+ * <dfn>preserved</dfn>. Such an expression will be pushed down intact to one
+ * of the inputs, or not pushed down at all.</p>
+ */
+public class PushProjector {
+ //~ Instance fields --------------------------------------------------------
+
+ private final Project origProj;
+ private final RexNode origFilter;
+ private final RelNode childRel;
+ private final ExprCondition preserveExprCondition;
+ private final RelBuilder relBuilder;
+
+ /**
+ * Original projection expressions
+ */
+ final List<RexNode> origProjExprs;
+
+ /**
+ * Fields from the RelNode that the projection is being pushed past
+ */
+ final List<RelDataTypeField> childFields;
+
+ /**
+ * Number of fields in the RelNode that the projection is being pushed past
+ */
+ final int nChildFields;
+
+ /**
+ * Bitmap containing the references in the original projection
+ */
+ final BitSet projRefs;
+
+ /**
+ * Bitmap containing the fields in the RelNode that the projection is being
+ * pushed past, if the RelNode is not a join. If the RelNode is a join, then
+ * the fields correspond to the left hand side of the join.
+ */
+ final ImmutableBitSet childBitmap;
+
+ /**
+ * Bitmap containing the fields in the right hand side of a join, in the
+ * case where the projection is being pushed past a join. Not used
+ * otherwise.
+ */
+ final ImmutableBitSet rightBitmap;
+
+ /**
+ * Bitmap containing the fields that should be strong, i.e. when preserving expressions
+ * we can only preserve them if the expressions if it is null when these fields are null.
+ */
+ final ImmutableBitSet strongBitmap;
+
+ /**
+ * Number of fields in the RelNode that the projection is being pushed past,
+ * if the RelNode is not a join. If the RelNode is a join, then this is the
+ * number of fields in the left hand side of the join.
+ *
+ * <p>The identity
+ * {@code nChildFields == nSysFields + nFields + nFieldsRight}
+ * holds. {@code nFields} does not include {@code nSysFields}.
+ * The output of a join looks like this:
+ *
+ * <blockquote><pre>
+ * | nSysFields | nFields | nFieldsRight |
+ * </pre></blockquote>
+ *
+ * <p>The output of a single-input rel looks like this:
+ *
+ * <blockquote><pre>
+ * | nSysFields | nFields |
+ * </pre></blockquote>
+ */
+ final int nFields;
+
+ /**
+ * Number of fields in the right hand side of a join, in the case where the
+ * projection is being pushed past a join. Always 0 otherwise.
+ */
+ final int nFieldsRight;
+
+ /**
+ * Number of system fields. System fields appear at the start of a join,
+ * before the first field from the left input.
+ */
+ private final int nSysFields;
+
+ /**
+ * Expressions referenced in the projection/filter that should be preserved.
+ * In the case where the projection is being pushed past a join, then the
+ * list only contains the expressions corresponding to the left hand side of
+ * the join.
+ */
+ final List<RexNode> childPreserveExprs;
+
+ /**
+ * Expressions referenced in the projection/filter that should be preserved,
+ * corresponding to expressions on the right hand side of the join, if the
+ * projection is being pushed past a join. Empty list otherwise.
+ */
+ final List<RexNode> rightPreserveExprs;
+
+ /**
+ * Number of system fields being projected.
+ */
+ int nSystemProject;
+
+ /**
+ * Number of fields being projected. In the case where the projection is
+ * being pushed past a join, the number of fields being projected from the
+ * left hand side of the join.
+ */
+ int nProject;
+
+ /**
+ * Number of fields being projected from the right hand side of a join, in
+ * the case where the projection is being pushed past a join. 0 otherwise.
+ */
+ int nRightProject;
+
+ /**
+ * Rex builder used to create new expressions.
+ */
+ final RexBuilder rexBuilder;
+
+ //~ Constructors -----------------------------------------------------------
+
+ /**
+ * Creates a PushProjector object for pushing projects past a RelNode.
+ *
+ * @param origProj the original projection that is being pushed;
+ * may be null if the projection is implied as a
+ * result of a projection having been trivially
+ * removed
+ * @param origFilter the filter that the projection must also be
+ * pushed past, if applicable
+ * @param childRel the RelNode that the projection is being
+ * pushed past
+ * @param preserveExprCondition condition for whether an expression should
+ * be preserved in the projection
+ */
+ public PushProjector(
+ Project origProj,
+ RexNode origFilter,
+ RelNode childRel,
+ ExprCondition preserveExprCondition,
+ RelBuilder relBuilder) {
+ this.origProj = origProj;
+ this.origFilter = origFilter;
+ this.childRel = childRel;
+ this.preserveExprCondition = preserveExprCondition;
+ this.relBuilder = Preconditions.checkNotNull(relBuilder);
+ if (origProj == null) {
+ origProjExprs = ImmutableList.of();
+ } else {
+ origProjExprs = origProj.getProjects();
+ }
+
+ childFields = childRel.getRowType().getFieldList();
+ nChildFields = childFields.size();
+
+ projRefs = new BitSet(nChildFields);
+ if (childRel instanceof Join) {
+ Join joinRel = (Join) childRel;
+ List<RelDataTypeField> leftFields =
+ joinRel.getLeft().getRowType().getFieldList();
+ List<RelDataTypeField> rightFields =
+ joinRel.getRight().getRowType().getFieldList();
+ nFields = leftFields.size();
+ nFieldsRight = childRel instanceof SemiJoin ? 0 : rightFields.size();
+ nSysFields = joinRel.getSystemFieldList().size();
+ childBitmap =
+ ImmutableBitSet.range(nSysFields, nFields + nSysFields);
+ rightBitmap =
+ ImmutableBitSet.range(nFields + nSysFields, nChildFields);
+
+ switch (joinRel.getJoinType()) {
+ case INNER:
+ strongBitmap = ImmutableBitSet.of();
+ break;
+ case RIGHT: // All the left-input's columns must be strong
+ strongBitmap = ImmutableBitSet.range(nSysFields, nFields + nSysFields);
+ break;
+ case LEFT: // All the right-input's columns must be strong
+ strongBitmap = ImmutableBitSet.range(nFields + nSysFields, nChildFields);
+ break;
+ case FULL:
+ default:
+ strongBitmap = ImmutableBitSet.range(nSysFields, nChildFields);
+ }
+
+ } else {
+ nFields = nChildFields;
+ nFieldsRight = 0;
+ childBitmap = ImmutableBitSet.range(nChildFields);
+ rightBitmap = null;
+ nSysFields = 0;
+ strongBitmap = ImmutableBitSet.of();
+ }
+ assert nChildFields == nSysFields + nFields + nFieldsRight;
+
+ childPreserveExprs = new ArrayList<RexNode>();
+ rightPreserveExprs = new ArrayList<RexNode>();
+
+ rexBuilder = childRel.getCluster().getRexBuilder();
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ /**
+ * Decomposes a projection to the input references referenced by a
+ * projection and a filter, either of which is optional. If both are
+ * provided, the filter is underneath the project.
+ *
+ * <p>Creates a projection containing all input references as well as
+ * preserving any special expressions. Converts the original projection
+ * and/or filter to reference the new projection. Then, finally puts on top,
+ * a final projection corresponding to the original projection.
+ *
+ * @param defaultExpr expression to be used in the projection if no fields
+ * or special columns are selected
+ * @return the converted projection if it makes sense to push elements of
+ * the projection; otherwise returns null
+ */
+ public RelNode convertProject(RexNode defaultExpr) {
+ // locate all fields referenced in the projection and filter
+ locateAllRefs();
+
+ // if all columns are being selected (either explicitly in the
+ // projection) or via a "select *", then there needs to be some
+ // special expressions to preserve in the projection; otherwise,
+ // there's no point in proceeding any further
+ if (origProj == null) {
+ if (childPreserveExprs.size() == 0) {
+ return null;
+ }
+
+ // even though there is no projection, this is the same as
+ // selecting all fields
+ if (nChildFields > 0) {
+ // Calling with nChildFields == 0 should be safe but hits
+ // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6222207
+ projRefs.set(0, nChildFields);
+ }
+ nProject = nChildFields;
+ } else if (
+ (projRefs.cardinality() == nChildFields)
+ && (childPreserveExprs.size() == 0)) {
+ return null;
+ }
+
+ // if nothing is being selected from the underlying rel, just
+ // project the default expression passed in as a parameter or the
+ // first column if there is no default expression
+ if ((projRefs.cardinality() == 0) && (childPreserveExprs.size() == 0)) {
+ if (defaultExpr != null) {
+ childPreserveExprs.add(defaultExpr);
+ } else if (nChildFields == 1) {
+ return null;
+ } else {
+ projRefs.set(0);
+ nProject = 1;
+ }
+ }
+
+ // create a new projection referencing all fields referenced in
+ // either the project or the filter
+ RelNode newProject = createProjectRefsAndExprs(childRel, false, false);
+
+ int[] adjustments = getAdjustments();
+
+ // if a filter was passed in, convert it to reference the projected
+ // columns, placing it on top of the project just created
+ RelNode projChild;
+ if (origFilter != null) {
+ RexNode newFilter =
+ convertRefsAndExprs(
+ origFilter,
+ newProject.getRowType().getFieldList(),
+ adjustments);
+ relBuilder.push(newProject);
+ relBuilder.filter(newFilter);
+ projChild = relBuilder.build();
+ } else {
+ projChild = newProject;
+ }
+
+ // put the original project on top of the filter/project, converting
+ // it to reference the modified projection list; otherwise, create
+ // a projection that essentially selects all fields
+ return createNewProject(projChild, adjustments);
+ }
+
+ /**
+ * Locates all references found in either the projection expressions a
+ * filter, as well as references to expressions that should be preserved.
+ * Based on that, determines whether pushing the projection makes sense.
+ *
+ * @return true if all inputs from the child that the projection is being
+ * pushed past are referenced in the projection/filter and no special
+ * preserve expressions are referenced; in that case, it does not make sense
+ * to push the projection
+ */
+ public boolean locateAllRefs() {
+ RexUtil.apply(
+ new InputSpecialOpFinder(
+ projRefs,
+ childBitmap,
+ rightBitmap,
+ strongBitmap,
+ preserveExprCondition,
+ childPreserveExprs,
+ rightPreserveExprs),
+ origProjExprs,
+ origFilter);
+
+ // The system fields of each child are always used by the join, even if
+ // they are not projected out of it.
+ projRefs.set(
+ nSysFields,
+ nSysFields + nSysFields,
+ true);
+ projRefs.set(
+ nSysFields + nFields,
+ nSysFields + nFields + nSysFields,
+ true);
+
+ // Count how many fields are projected.
+ nSystemProject = 0;
+ nProject = 0;
+ nRightProject = 0;
+ for (int bit : BitSets.toIter(projRefs)) {
+ if (bit < nSysFields) {
+ nSystemProject++;
+ } else if (bit < nSysFields + nFields) {
+ nProject++;
+ } else {
+ nRightProject++;
+ }
+ }
+
+ assert nSystemProject + nProject + nRightProject
+ == projRefs.cardinality();
+
+ if ((childRel instanceof Join)
+ || (childRel instanceof SetOp)) {
+ // if nothing is projected from the children, arbitrarily project
+ // the first columns; this is necessary since Fennel doesn't
+ // handle 0-column projections
+ if ((nProject == 0) && (childPreserveExprs.size() == 0)) {
+ projRefs.set(0);
+ nProject = 1;
+ }
+ if (childRel instanceof Join) {
+ if ((nRightProject == 0) && (rightPreserveExprs.size() == 0)) {
+ projRefs.set(nFields);
+ nRightProject = 1;
+ }
+ }
+ }
+
+ // no need to push projections if all children fields are being
+ // referenced and there are no special preserve expressions; note
+ // that we need to do this check after we've handled the 0-column
+ // project cases
+ if (projRefs.cardinality() == nChildFields
+ && childPreserveExprs.size() == 0
+ && rightPreserveExprs.size() == 0) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Creates a projection based on the inputs specified in a bitmap and the
+ * expressions that need to be preserved. The expressions are appended after
+ * the input references.
+ *
+ * @param projChild child that the projection will be created on top of
+ * @param adjust if true, need to create new projection expressions;
+ * otherwise, the existing ones are reused
+ * @param rightSide if true, creating a projection for the right hand side
+ * of a join
+ * @return created projection
+ */
+ public Project createProjectRefsAndExprs(
+ RelNode projChild,
+ boolean adjust,
+ boolean rightSide) {
+ List<RexNode> preserveExprs;
+ int nInputRefs;
+ int offset;
+
+ if (rightSide) {
+ preserveExprs = rightPreserveExprs;
+ nInputRefs = nRightProject;
+ offset = nSysFields + nFields;
+ } else {
+ preserveExprs = childPreserveExprs;
+ nInputRefs = nProject;
+ offset = nSysFields;
+ }
+ int refIdx = offset - 1;
+ List<Pair<RexNode, String>> newProjects =
+ new ArrayList<Pair<RexNode, String>>();
+ List<RelDataTypeField> destFields =
+ projChild.getRowType().getFieldList();
+
+ // add on the input references
+ for (int i = 0; i < nInputRefs; i++) {
+ refIdx = projRefs.nextSetBit(refIdx + 1);
+ assert refIdx >= 0;
+ final RelDataTypeField destField = destFields.get(refIdx - offset);
+ newProjects.add(
+ Pair.of(
+ (RexNode) rexBuilder.makeInputRef(
+ destField.getType(), refIdx - offset),
+ destField.getName()));
+ }
+
+ // add on the expressions that need to be preserved, converting the
+ // arguments to reference the projected columns (if necessary)
+ int[] adjustments = {};
+ if ((preserveExprs.size() > 0) && adjust) {
+ adjustments = new int[childFields.size()];
+ for (int idx = offset; idx < childFields.size(); idx++) {
+ adjustments[idx] = -offset;
+ }
+ }
+ for (RexNode projExpr : preserveExprs) {
+ RexNode newExpr;
+ if (adjust) {
+ newExpr =
+ projExpr.accept(
+ new RelOptUtil.RexInputConverter(
+ rexBuilder,
+ childFields,
+ destFields,
+ adjustments));
+ } else {
+ newExpr = projExpr;
+ }
+ newProjects.add(
+ Pair.of(
+ newExpr,
+ null));
+ }
+
+ return (Project) RelOptUtil.createProject(
+ projChild,
+ Pair.left(newProjects),
+ Pair.right(newProjects),
+ false,
+ relBuilder);
+ }
+
+ /**
+ * Determines how much each input reference needs to be adjusted as a result
+ * of projection
+ *
+ * @return array indicating how much each input needs to be adjusted by
+ */
+ public int[] getAdjustments() {
+ int[] adjustments = new int[nChildFields];
+ int newIdx = 0;
+ int rightOffset = childPreserveExprs.size();
+ for (int pos : BitSets.toIter(projRefs)) {
+ adjustments[pos] = -(pos - newIdx);
+ if (pos >= nSysFields + nFields) {
+ adjustments[pos] += rightOffset;
+ }
+ newIdx++;
+ }
+ return adjustments;
+ }
+
+ /**
+ * Clones an expression tree and walks through it, adjusting each
+ * RexInputRef index by some amount, and converting expressions that need to
+ * be preserved to field references.
+ *
+ * @param rex the expression
+ * @param destFields fields that the new expressions will be referencing
+ * @param adjustments the amount each input reference index needs to be
+ * adjusted by
+ * @return modified expression tree
+ */
+ public RexNode convertRefsAndExprs(
+ RexNode rex,
+ List<RelDataTypeField> destFields,
+ int[] adjustments) {
+ return rex.accept(
+ new RefAndExprConverter(
+ rexBuilder,
+ childFields,
+ destFields,
+ adjustments,
+ childPreserveExprs,
+ nProject,
+ rightPreserveExprs,
+ nProject + childPreserveExprs.size() + nRightProject));
+ }
+
+ /**
+ * Creates a new projection based on the original projection, adjusting all
+ * input refs using an adjustment array passed in. If there was no original
+ * projection, create a new one that selects every field from the underlying
+ * rel.
+ *
+ * <p>If the resulting projection would be trivial, return the child.
+ *
+ * @param projChild child of the new project
+ * @param adjustments array indicating how much each input reference should
+ * be adjusted by
+ * @return the created projection
+ */
+ public RelNode createNewProject(RelNode projChild, int[] adjustments) {
+ final List<Pair<RexNode, String>> projects = Lists.newArrayList();
+
+ if (origProj != null) {
+ for (Pair<RexNode, String> p : origProj.getNamedProjects()) {
+ projects.add(
+ Pair.of(
+ convertRefsAndExprs(
+ p.left,
+ projChild.getRowType().getFieldList(),
+ adjustments),
+ p.right));
+ }
+ } else {
+ for (Ord<RelDataTypeField> field : Ord.zip(childFields)) {
+ projects.add(
+ Pair.of(
+ (RexNode) rexBuilder.makeInputRef(
+ field.e.getType(), field.i), field.e.getName()));
+ }
+ }
+ return RelOptUtil.createProject(
+ projChild,
+ Pair.left(projects),
+ Pair.right(projects),
+ true /* optimize to avoid trivial projections, as per javadoc */,
+ relBuilder);
+ }
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ /**
+ * Visitor which builds a bitmap of the inputs used by an expressions, as
+ * well as locating expressions corresponding to special operators.
+ */
+ private class InputSpecialOpFinder extends RexVisitorImpl<Void> {
+ private final BitSet rexRefs;
+ private final ImmutableBitSet leftFields;
+ private final ImmutableBitSet rightFields;
+ private final ImmutableBitSet strongFields;
+ private final ExprCondition preserveExprCondition;
+ private final List<RexNode> preserveLeft;
+ private final List<RexNode> preserveRight;
+ private final Strong strong;
+
+ public InputSpecialOpFinder(
+ BitSet rexRefs,
+ ImmutableBitSet leftFields,
+ ImmutableBitSet rightFields,
+ final ImmutableBitSet strongFields,
+ ExprCondition preserveExprCondition,
+ List<RexNode> preserveLeft,
+ List<RexNode> preserveRight) {
+ super(true);
+ this.rexRefs = rexRefs;
+ this.leftFields = leftFields;
+ this.rightFields = rightFields;
+ this.preserveExprCondition = preserveExprCondition;
+ this.preserveLeft = preserveLeft;
+ this.preserveRight = preserveRight;
+
+ this.strongFields = strongFields;
+ this.strong = Strong.of(strongFields);
+ }
+
+ public Void visitCall(RexCall call) {
+ if (preserve(call)) {
+ return null;
+ }
+ super.visitCall(call);
+ return null;
+ }
+
+ private boolean isStrong(final ImmutableBitSet exprArgs, final RexNode call) {
+ // If the expressions do not use any of the inputs that require output to be null,
+ // no need to check. Otherwise, check that the expression is null.
+ // For example, in an "left outer join", we don't require that expressions
+ // pushed down into the left input to be strong. On the other hand,
+ // expressions pushed into the right input must be. In that case,
+ // strongFields == right input fields.
+ return !strongFields.intersects(exprArgs) || strong.isNull(call);
+ }
+
+ private boolean preserve(RexNode call) {
+ if (preserveExprCondition.test(call)) {
+ // if the arguments of the expression only reference the
+ // left hand side, preserve it on the left; similarly, if
+ // it only references expressions on the right
+ final ImmutableBitSet exprArgs = RelOptUtil.InputFinder.bits(call);
+ if (exprArgs.cardinality() > 0) {
+ if (leftFields.contains(exprArgs) && isStrong(exprArgs, call)) {
+ addExpr(preserveLeft, call);
+ return true;
+ } else if (rightFields.contains(exprArgs) && isStrong(exprArgs, call)) {
+ assert preserveRight != null;
+ addExpr(preserveRight, call);
+ return true;
+ }
+ }
+ // if the expression arguments reference both the left and
+ // right, fall through and don't attempt to preserve the
+ // expression, but instead locate references and special
+ // ops in the call operands
+ }
+ return false;
+ }
+
+ public Void visitInputRef(RexInputRef inputRef) {
+ rexRefs.set(inputRef.getIndex());
+ return null;
+ }
+
+ /**
+ * Adds an expression to a list if the same expression isn't already in
+ * the list. Expressions are identical if their digests are the same.
+ *
+ * @param exprList current list of expressions
+ * @param newExpr new expression to be added
+ */
+ private void addExpr(List<RexNode> exprList, RexNode newExpr) {
+ String newExprString = newExpr.toString();
+ for (RexNode expr : exprList) {
+ if (newExprString.compareTo(expr.toString()) == 0) {
+ return;
+ }
+ }
+ exprList.add(newExpr);
+ }
+ }
+
+ /**
+ * Walks an expression tree, replacing input refs with new values to reflect
+ * projection and converting special expressions to field references.
+ */
+ private class RefAndExprConverter extends RelOptUtil.RexInputConverter {
+ private final List<RexNode> preserveLeft;
+ private final int firstLeftRef;
+ private final List<RexNode> preserveRight;
+ private final int firstRightRef;
+
+ public RefAndExprConverter(
+ RexBuilder rexBuilder,
+ List<RelDataTypeField> srcFields,
+ List<RelDataTypeField> destFields,
+ int[] adjustments,
+ List<RexNode> preserveLeft,
+ int firstLeftRef,
+ List<RexNode> preserveRight,
+ int firstRightRef) {
+ super(rexBuilder, srcFields, destFields, adjustments);
+ this.preserveLeft = preserveLeft;
+ this.firstLeftRef = firstLeftRef;
+ this.preserveRight = preserveRight;
+ this.firstRightRef = firstRightRef;
+ }
+
+ public RexNode visitCall(RexCall call) {
+ // if the expression corresponds to one that needs to be preserved,
+ // convert it to a field reference; otherwise, convert the entire
+ // expression
+ int match =
+ findExprInLists(
+ call,
+ preserveLeft,
+ firstLeftRef,
+ preserveRight,
+ firstRightRef);
+ if (match >= 0) {
+ return rexBuilder.makeInputRef(
+ destFields.get(match).getType(),
+ match);
+ }
+ return super.visitCall(call);
+ }
+
+ /**
+ * Looks for a matching RexNode from among two lists of RexNodes and
+ * returns the offset into the list corresponding to the match, adjusted
+ * by an amount, depending on whether the match was from the first or
+ * second list.
+ *
+ * @param rex RexNode that is being matched against
+ * @param rexList1 first list of RexNodes
+ * @param adjust1 adjustment if match occurred in first list
+ * @param rexList2 second list of RexNodes
+ * @param adjust2 adjustment if match occurred in the second list
+ * @return index in the list corresponding to the matching RexNode; -1
+ * if no match
+ */
+ private int findExprInLists(
+ RexNode rex,
+ List<RexNode> rexList1,
+ int adjust1,
+ List<RexNode> rexList2,
+ int adjust2) {
+ int match = findExprInList(rex, rexList1);
+ if (match >= 0) {
+ return match + adjust1;
+ }
+
+ if (rexList2 != null) {
+ match = findExprInList(rex, rexList2);
+ if (match >= 0) {
+ return match + adjust2;
+ }
+ }
+
+ return -1;
+ }
+
+ private int findExprInList(RexNode rex, List<RexNode> rexList) {
+ int match = 0;
+ for (RexNode rexElement : rexList) {
+ if (rexElement.toString().compareTo(rex.toString()) == 0) {
+ return match;
+ }
+ match++;
+ }
+ return -1;
+ }
+ }
+
+ /**
+ * A functor that replies true or false for a given expression.
+ *
+ * @see org.apache.calcite.rel.rules.PushProjector.OperatorExprCondition
+ */
+ public interface ExprCondition extends Predicate<RexNode> {
+ /**
+ * Evaluates a condition for a given expression.
+ *
+ * @param expr Expression
+ * @return result of evaluating the condition
+ */
+ boolean test(RexNode expr);
+
+ /**
+ * Constant condition that replies {@code false} for all expressions.
+ */
+ ExprCondition FALSE =
+ new ExprConditionImpl() {
+ @Override public boolean test(RexNode expr) {
+ return false;
+ }
+ };
+
+ /**
+ * Constant condition that replies {@code true} for all expressions.
+ */
+ ExprCondition TRUE =
+ new ExprConditionImpl() {
+ @Override public boolean test(RexNode expr) {
+ return true;
+ }
+ };
+ }
+
+ /** Implementation of {@link ExprCondition}. */
+ abstract static class ExprConditionImpl extends PredicateImpl<RexNode>
+ implements ExprCondition {
+ }
+
+ /**
+ * An expression condition that evaluates to true if the expression is
+ * a call to one of a set of operators.
+ */
+ class OperatorExprCondition extends ExprConditionImpl {
+ private final Set<SqlOperator> operatorSet;
+
+ /**
+ * Creates an OperatorExprCondition.
+ *
+ * @param operatorSet Set of operators
+ */
+ public OperatorExprCondition(Iterable<? extends SqlOperator> operatorSet) {
+ this.operatorSet = ImmutableSet.copyOf(operatorSet);
+ }
+
+ public boolean test(RexNode expr) {
+ return expr instanceof RexCall
+ && operatorSet.contains(((RexCall) expr).getOperator());
+ }
+ }
+}
+
+// End PushProjector.java
http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
deleted file mode 100644
index a57cf10..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-/*
- * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL CALCITE-1761 IS FIXED.
- */
-
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlOperandTypeChecker;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-/**
- * SQL function that computes keys by which rows can be partitioned and
- * aggregated.
- *
- * <p>Grouped window functions always occur in the GROUP BY clause. They often
- * have auxiliary functions that access information about the group. For
- * example, {@code HOP} is a group function, and its auxiliary functions are
- * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming
- * query:
- *
- * <blockquote><pre>
- * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR),
- * HOP_END(rowtime, INTERVAL '1' HOUR),
- * MIN(unitPrice)
- * FROM Orders
- * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
- * </pre></blockquote>
- */
-class SqlGroupFunction extends SqlFunction {
- /** The grouped function, if this an auxiliary function; null otherwise. */
- final SqlGroupFunction groupFunction;
-
- /** Creates a SqlGroupFunction.
- *
- * @param kind Kind; also determines function name
- * @param groupFunction Group function, if this is an auxiliary;
- * null, if this is a group function
- * @param operandTypeChecker Operand type checker
- */
- SqlGroupFunction(SqlKind kind, SqlGroupFunction groupFunction,
- SqlOperandTypeChecker operandTypeChecker) {
- super(kind.name(), kind, ReturnTypes.ARG0, null,
- operandTypeChecker, SqlFunctionCategory.SYSTEM);
- this.groupFunction = groupFunction;
- if (groupFunction != null) {
- assert groupFunction.groupFunction == null;
- }
- }
-
- /** Creates an auxiliary function from this grouped window function. */
- SqlGroupFunction auxiliary(SqlKind kind) {
- return new SqlGroupFunction(kind, this, getOperandTypeChecker());
- }
-
- /** Returns a list of this grouped window function's auxiliary functions. */
- List<SqlGroupFunction> getAuxiliaryFunctions() {
- return ImmutableList.of();
- }
-
- @Override public boolean isGroup() {
- // Auxiliary functions are not group functions
- return groupFunction == null;
- }
-
- @Override public boolean isGroupAuxiliary() {
- return groupFunction != null;
- }
-
- @Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
- // Monotonic iff its first argument is, but not strict.
- //
- // Note: This strategy happens to works for all current group functions
- // (HOP, TUMBLE, SESSION). When there are exceptions to this rule, we'll
- // make the method abstract.
- return call.getOperandMonotonicity(0).unstrict();
- }
-}
-
-// End SqlGroupFunction.java