You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/08/21 22:24:43 UTC

[flink] 01/03: [FLINK-10127] [core] Add TypeInformation for java.time.Instant.

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1798fd857f81337e7521ad284ee31e58c5d2be5
Author: Alexey Trenikhin <al...@gmail.com>
AuthorDate: Mon Aug 13 14:19:59 2018 -0700

    [FLINK-10127] [core] Add TypeInformation for java.time.Instant.
    
    This closes #6549.
---
 .../flink/api/common/typeinfo/BasicTypeInfo.java   |   6 ++
 .../apache/flink/api/common/typeinfo/Types.java    |   7 ++
 .../common/typeutils/base/InstantComparator.java   | 106 +++++++++++++++++++++
 .../common/typeutils/base/InstantSerializer.java   | 105 ++++++++++++++++++++
 .../typeutils/base/InstantComparatorTest.java      |  56 +++++++++++
 .../typeutils/base/InstantSerializerTest.java      |  67 +++++++++++++
 .../org/apache/flink/types/BasicTypeInfoTest.java  |   3 +-
 .../apache/flink/api/scala/typeutils/Types.scala   |   5 +
 8 files changed, 354 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index f19865e..00c4c31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -40,6 +40,8 @@ import org.apache.flink.api.common.typeutils.base.DoubleComparator;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatComparator;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.InstantComparator;
+import org.apache.flink.api.common.typeutils.base.InstantSerializer;
 import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
@@ -53,6 +55,7 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import java.lang.reflect.Constructor;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
@@ -83,6 +86,8 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
 	public static final BasicTypeInfo<BigInteger> BIG_INT_TYPE_INFO = new BasicTypeInfo<>(BigInteger.class, new Class<?>[]{}, BigIntSerializer.INSTANCE, BigIntComparator.class);
 	public static final BasicTypeInfo<BigDecimal> BIG_DEC_TYPE_INFO = new BasicTypeInfo<>(BigDecimal.class, new Class<?>[]{}, BigDecSerializer.INSTANCE, BigDecComparator.class);
+	public static final BasicTypeInfo<Instant> INSTANT_TYPE_INFO = new BasicTypeInfo<>(Instant.class, new Class<?>[]{}, InstantSerializer.INSTANCE, InstantComparator.class);
+
 
 	// --------------------------------------------------------------------------------------------
 
@@ -250,5 +255,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 		TYPES.put(void.class, VOID_TYPE_INFO);
 		TYPES.put(BigInteger.class, BIG_INT_TYPE_INFO);
 		TYPES.put(BigDecimal.class, BIG_DEC_TYPE_INFO);
+		TYPES.put(Instant.class, INSTANT_TYPE_INFO);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index 0140e9c..8e7538a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -43,6 +43,7 @@ import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -150,6 +151,12 @@ public class Types {
 	 */
 	public static final TypeInformation<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP;
 
+
+	/**
+	 * Returns type infomation for {@link java.time.Instant}. Supports a null value.
+	 */
+	public static final TypeInformation<Instant> INSTANT = BasicTypeInfo.INSTANT_TYPE_INFO;
+
 	//CHECKSTYLE.OFF: MethodName
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
new file mode 100644
index 0000000..28a7be0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Comparator for comparing Java Instant.
+ */
+@Internal
+public final class InstantComparator extends BasicTypeComparator<Instant>{
+
+	private static final long serialVersionUID = 1L;
+	private static final long SECONDS_MIN_VALUE = Instant.MIN.getEpochSecond();
+
+	public InstantComparator(boolean ascending) {
+		super(ascending);
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		final long lSeconds = firstSource.readLong();
+		final long rSeconds = secondSource.readLong();
+		final int comp;
+		if (lSeconds == rSeconds) {
+			final int lNanos = firstSource.readInt();
+			final int rNanos = secondSource.readInt();
+			comp = (lNanos < rNanos ? -1 : (lNanos == rNanos ? 0 : 1));
+		} else {
+			comp = lSeconds < rSeconds ? -1 : 1;
+		}
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return true;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return InstantSerializer.SECONDS_BYTES + InstantSerializer.NANOS_BYTES;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(Instant record, MemorySegment target, int offset, int numBytes) {
+		final int secondsBytes = InstantSerializer.SECONDS_BYTES;
+		final long normalizedSeconds = record.getEpochSecond() - SECONDS_MIN_VALUE;
+		if (numBytes >= secondsBytes) {
+			target.putLongBigEndian(offset, normalizedSeconds);
+			offset += secondsBytes;
+			numBytes -= secondsBytes;
+
+			final int nanosBytes = InstantSerializer.NANOS_BYTES;
+			if (numBytes >= nanosBytes) {
+				target.putIntBigEndian(offset, record.getNano());
+				offset += nanosBytes;
+				numBytes -= nanosBytes;
+				for (int i = 0; i < numBytes; i++) {
+					target.put(offset + i, (byte) 0);
+				}
+			} else {
+				final int nanos = record.getNano();
+				for (int i = 0; i < numBytes;  i++) {
+					target.put(offset + i, (byte) (nanos >>> ((3 - i) << 3)));
+				}
+			}
+		} else {
+			for (int i = 0; i < numBytes; i++) {
+				target.put(offset + i, (byte) (normalizedSeconds >>> ((7 - i) << 3)));
+			}
+		}
+	}
+
+	@Override
+	public TypeComparator<Instant> duplicate() {
+		return new InstantComparator(ascendingComparison);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
new file mode 100644
index 0000000..2476ef6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Serializer for serializing/deserializing Instant values including null values.
+ */
+@Internal
+public final class InstantSerializer extends TypeSerializerSingleton<Instant> {
+	static final int SECONDS_BYTES = Long.BYTES;
+	static final int NANOS_BYTES = Integer.BYTES;
+
+	private static final long NULL_SECONDS = Long.MIN_VALUE;
+	//Nanos of normal Instant is between 0 and 999,999,999,
+	//therefore we can use Integer.MIN_VALUE to represent NULL Instant
+	//regardless supported range of seconds
+	private static final int  NULL_NANOS = Integer.MIN_VALUE;
+
+	public static final InstantSerializer INSTANCE = new InstantSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return true;
+	}
+
+	@Override
+	public Instant createInstance() {
+		return Instant.EPOCH;
+	}
+
+	@Override
+	public Instant copy(Instant from) {
+		return from;
+	}
+
+	@Override
+	public Instant copy(Instant from, Instant reuse) {
+		return from;
+	}
+
+	@Override
+	public int getLength() {
+		return SECONDS_BYTES + NANOS_BYTES;
+	}
+
+	@Override
+	public void serialize(Instant record, DataOutputView target) throws IOException {
+		if (record == null) {
+			target.writeLong(NULL_SECONDS);
+			target.writeInt(NULL_NANOS);
+		} else {
+			target.writeLong(record.getEpochSecond());
+			target.writeInt(record.getNano());
+		}
+	}
+
+	@Override
+	public Instant deserialize(DataInputView source) throws IOException {
+		final long seconds = source.readLong();
+		final int nanos = source.readInt();
+		if (seconds == NULL_SECONDS && nanos == NULL_NANOS) {
+			return null;
+		}
+		return Instant.ofEpochSecond(seconds, nanos);
+	}
+
+	@Override
+	public Instant deserialize(Instant reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.writeLong(source.readLong());
+		target.writeInt(source.readInt());
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof InstantSerializer;
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
new file mode 100644
index 0000000..010df04
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.Instant;
+
+/**
+ * A test for the {@link InstantComparator}.
+ */
+public class InstantComparatorTest extends ComparatorTestBase<Instant> {
+
+	@Override
+	protected TypeComparator<Instant> createComparator(boolean ascending) {
+		return new InstantComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<Instant> createSerializer() {
+		return new InstantSerializer();
+	}
+
+	@Override
+	protected Instant[] getSortedTestData() {
+		return new Instant[] {
+			Instant.EPOCH,
+			Instant.parse("1970-01-01T00:00:00.001Z"),
+			Instant.parse("1990-10-14T02:42:25.123Z"),
+			Instant.parse("1990-10-14T02:42:25.123000001Z"),
+			Instant.parse("1990-10-14T02:42:25.123000002Z"),
+			Instant.parse("2013-08-12T14:15:59.478Z"),
+			Instant.parse("2013-08-12T14:15:59.479Z"),
+			Instant.parse("2040-05-12T18:00:45.999Z"),
+			Instant.MAX
+		};
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
new file mode 100644
index 0000000..7c14c69
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * A test for the {@link InstantSerializer}.
+ */
+public class InstantSerializerTest extends SerializerTestBase<Instant> {
+	@Override
+	protected TypeSerializer<Instant> createSerializer() {
+		return new InstantSerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return 12;
+	}
+
+	@Override
+	protected Class<Instant> getTypeClass() {
+		return Instant.class;
+	}
+
+
+	private static long rndSeconds(Random rnd) {
+		return (long) (Instant.MIN.getEpochSecond()
+			+ rnd.nextDouble() * (Instant.MAX.getEpochSecond() - Instant.MIN.getEpochSecond()));
+	}
+
+	private static int rndNanos(Random rnd) {
+		return (int) (rnd.nextDouble() * 999999999);
+	}
+
+	@Override
+	protected Instant[] getTestData() {
+		final Random rnd = new Random(874597969123412341L);
+
+		return new Instant[] {
+			Instant.EPOCH, Instant.MIN, Instant.MAX,
+			Instant.ofEpochSecond(rndSeconds(rnd),	rndNanos(rnd)),
+			Instant.ofEpochSecond(1534135584,949495),
+			Instant.ofEpochSecond(56090783)
+		};
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
index 5707701..bbd75d7 100644
--- a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.time.Instant;
 import java.util.Date;
 
 import static org.junit.Assert.assertEquals;
@@ -33,7 +34,7 @@ public class BasicTypeInfoTest extends TestLogger {
 
 	static Class<?>[] classes = {String.class, Integer.class, Boolean.class, Byte.class,
 		Short.class, Long.class, Float.class, Double.class, Character.class, Date.class,
-		Void.class, BigInteger.class, BigDecimal.class};
+		Void.class, BigInteger.class, BigDecimal.class, Instant.class};
 
 	@Test
 	public void testBasicTypeInfoEquality() {
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
index 4ce9b0f..a4ec6ed 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -157,6 +157,11 @@ object Types {
   val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
 
   /**
+    * Returns type information for [[java.time.Instant]]. Supports a null value.
+    */
+  val INSTANT: TypeInformation[java.time.Instant] = JTypes.INSTANT
+
+  /**
     * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
     * A row itself must not be null.
     *