You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/09/14 13:33:11 UTC

[GitHub] flink pull request #4658: [Flink-7596][Table API & SQL] fix bug when Set Ope...

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4658#discussion_r138894127
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java ---
    @@ -0,0 +1,575 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.calcite.sql.type;
    +
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
    +import org.apache.calcite.rel.type.RelDataTypeFamily;
    +import org.apache.calcite.rel.type.RelDataTypeSystem;
    +import org.apache.calcite.sql.SqlCollation;
    +import org.apache.calcite.sql.SqlIntervalQualifier;
    +import org.apache.calcite.util.Glossary;
    +import org.apache.calcite.util.Util;
    +import org.apache.flink.table.plan.schema.GenericRelDataType;
    +
    +import java.nio.charset.Charset;
    +import java.util.List;
    +
    +/**
    + * SqlTypeFactoryImpl provides a default implementation of
    + * {@link RelDataTypeFactory} which supports SQL types.
    + */
    +public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl {
    +	//~ Constructors -----------------------------------------------------------
    +
    +	public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
    +		super(typeSystem);
    +	}
    +
    +	//~ Methods ----------------------------------------------------------------
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createSqlType(SqlTypeName typeName) {
    +		if (typeName.allowsPrec()) {
    +			return createSqlType(typeName, typeSystem.getDefaultPrecision(typeName));
    +		}
    +		assertBasic(typeName);
    +		RelDataType newType = new BasicSqlType(typeSystem, typeName);
    +		return canonize(newType);
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createSqlType(
    +		SqlTypeName typeName,
    +		int precision) {
    +		final int maxPrecision = typeSystem.getMaxPrecision(typeName);
    +		if (maxPrecision >= 0 && precision > maxPrecision) {
    +			precision = maxPrecision;
    +		}
    +		if (typeName.allowsScale()) {
    +			return createSqlType(typeName, precision, typeName.getDefaultScale());
    +		}
    +		assertBasic(typeName);
    +		assert (precision >= 0)
    +			|| (precision == RelDataType.PRECISION_NOT_SPECIFIED);
    +		RelDataType newType = new BasicSqlType(typeSystem, typeName, precision);
    +		newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
    +		return canonize(newType);
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createSqlType(
    +		SqlTypeName typeName,
    +		int precision,
    +		int scale) {
    +		assertBasic(typeName);
    +		assert (precision >= 0)
    +			|| (precision == RelDataType.PRECISION_NOT_SPECIFIED);
    +		final int maxPrecision = typeSystem.getMaxPrecision(typeName);
    +		if (maxPrecision >= 0 && precision > maxPrecision) {
    +			precision = maxPrecision;
    +		}
    +		RelDataType newType =
    +			new BasicSqlType(typeSystem, typeName, precision, scale);
    +		newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
    +		return canonize(newType);
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createMultisetType(
    +		RelDataType type,
    +		long maxCardinality) {
    +		assert maxCardinality == -1;
    +		RelDataType newType = new MultisetSqlType(type, false);
    +		return canonize(newType);
    +	}
    +
    +	public RelDataType createArrayType(
    +		RelDataType elementType,
    +		long maxCardinality) {
    +		assert maxCardinality == -1;
    +		ArraySqlType newType = new ArraySqlType(elementType, false);
    +		return canonize(newType);
    +	}
    +
    +	public RelDataType createMapType(
    +		RelDataType keyType,
    +		RelDataType valueType) {
    +		MapSqlType newType = new MapSqlType(keyType, valueType, false);
    +		return canonize(newType);
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createSqlIntervalType(
    +		SqlIntervalQualifier intervalQualifier) {
    +		RelDataType newType =
    +			new IntervalSqlType(typeSystem, intervalQualifier, false);
    +		return canonize(newType);
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createTypeWithCharsetAndCollation(
    +		RelDataType type,
    +		Charset charset,
    +		SqlCollation collation) {
    +		assert SqlTypeUtil.inCharFamily(type) : type;
    +		assert charset != null;
    +		assert collation != null;
    +		RelDataType newType;
    +		if (type instanceof BasicSqlType) {
    +			BasicSqlType sqlType = (BasicSqlType) type;
    +			newType = sqlType.createWithCharsetAndCollation(charset, collation);
    +		} else if (type instanceof JavaType) {
    +			JavaType javaType = (JavaType) type;
    +			newType =
    +				new JavaType(
    +					javaType.getJavaClass(),
    +					javaType.isNullable(),
    +					charset,
    +					collation);
    +		} else {
    +			throw Util.needToImplement("need to implement " + type);
    +		}
    +		return canonize(newType);
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType leastRestrictive(List<RelDataType> types) {
    +		assert types != null;
    +		assert types.size() >= 1;
    +
    +		RelDataType type0 = types.get(0);
    +		if (type0.getSqlTypeName() != null) {
    +			RelDataType resultType = leastRestrictiveSqlType(types);
    +			if (resultType != null) {
    +				return resultType;
    +			}
    +			return leastRestrictiveByCast(types);
    +		}
    +
    +		return super.leastRestrictive(types);
    +	}
    +
    +	private RelDataType leastRestrictiveByCast(List<RelDataType> types) {
    +		RelDataType resultType = types.get(0);
    +		boolean anyNullable = resultType.isNullable();
    +		for (int i = 1; i < types.size(); i++) {
    +			RelDataType type = types.get(i);
    +			if (type.getSqlTypeName() == SqlTypeName.NULL) {
    +				anyNullable = true;
    +				continue;
    +			}
    +
    +			if (type.isNullable()) {
    +				anyNullable = true;
    +			}
    +
    +			if (SqlTypeUtil.canCastFrom(type, resultType, false)) {
    +				resultType = type;
    +			} else {
    +				if (!SqlTypeUtil.canCastFrom(resultType, type, false)) {
    +					return null;
    +				}
    +			}
    +		}
    +		if (anyNullable) {
    +			return createTypeWithNullability(resultType, true);
    +		} else {
    +			return resultType;
    +		}
    +	}
    +
    +	// implement RelDataTypeFactory
    +	public RelDataType createTypeWithNullability(
    +		final RelDataType type,
    +		final boolean nullable) {
    +		RelDataType newType;
    +		if (type instanceof BasicSqlType) {
    +			BasicSqlType sqlType = (BasicSqlType) type;
    +			newType = sqlType.createWithNullability(nullable);
    +		} else if (type instanceof MapSqlType) {
    +			newType = copyMapType(type, nullable);
    +		} else if (type instanceof ArraySqlType) {
    +			newType = copyArrayType(type, nullable);
    +		} else if (type instanceof MultisetSqlType) {
    +			newType = copyMultisetType(type, nullable);
    +		} else if (type instanceof IntervalSqlType) {
    +			newType = copyIntervalType(type, nullable);
    +		} else if (type instanceof ObjectSqlType) {
    +			newType = copyObjectType(type, nullable);
    +		} else {
    +			return super.createTypeWithNullability(type, nullable);
    +		}
    +		return canonize(newType);
    +	}
    +
    +	private void assertBasic(SqlTypeName typeName) {
    +		assert typeName != null;
    +		assert typeName != SqlTypeName.MULTISET
    +			: "use createMultisetType() instead";
    +		assert !SqlTypeName.INTERVAL_TYPES.contains(typeName)
    +			: "use createSqlIntervalType() instead";
    +	}
    +
    +	private RelDataType leastRestrictiveSqlType(List<RelDataType> types) {
    +		RelDataType resultType = null;
    +		int nullCount = 0;
    +		int nullableCount = 0;
    +		int javaCount = 0;
    +		int anyCount = 0;
    +
    +		for (RelDataType type : types) {
    +			final SqlTypeName typeName = type.getSqlTypeName();
    +			if (typeName == null) {
    +				return null;
    +			}
    +			if (typeName == SqlTypeName.ANY) {
    +				anyCount++;
    +			}
    +			if (type.isNullable()) {
    +				++nullableCount;
    +			}
    +			if (typeName == SqlTypeName.NULL) {
    +				++nullCount;
    +			}
    +			if (isJavaType(type)) {
    +				++javaCount;
    +			}
    +		}
    +
    +		//  if any of the inputs are ANY, the output is ANY
    +		if (anyCount > 0) {
    +			// for flink, it should return GenericRelDataType
    +			if (types.get(0) instanceof GenericRelDataType &&
    --- End diff --
    
    We shouldn't copy Calcite code and add Flink dependencies to it. This class can never be removed in a later Calcite release. Isn't there a cleaner way to fix this issue. Please also add a header to this file (as we did it with other Calcite files), to add information when we can remove it again from the Flink code base.


---