You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/08/21 22:24:43 UTC
[flink] 01/03: [FLINK-10127] [core] Add TypeInformation for
java.time.Instant.
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e1798fd857f81337e7521ad284ee31e58c5d2be5
Author: Alexey Trenikhin <al...@gmail.com>
AuthorDate: Mon Aug 13 14:19:59 2018 -0700
[FLINK-10127] [core] Add TypeInformation for java.time.Instant.
This closes #6549.
---
.../flink/api/common/typeinfo/BasicTypeInfo.java | 6 ++
.../apache/flink/api/common/typeinfo/Types.java | 7 ++
.../common/typeutils/base/InstantComparator.java | 106 +++++++++++++++++++++
.../common/typeutils/base/InstantSerializer.java | 105 ++++++++++++++++++++
.../typeutils/base/InstantComparatorTest.java | 56 +++++++++++
.../typeutils/base/InstantSerializerTest.java | 67 +++++++++++++
.../org/apache/flink/types/BasicTypeInfoTest.java | 3 +-
.../apache/flink/api/scala/typeutils/Types.scala | 5 +
8 files changed, 354 insertions(+), 1 deletion(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index f19865e..00c4c31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -40,6 +40,8 @@ import org.apache.flink.api.common.typeutils.base.DoubleComparator;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatComparator;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.InstantComparator;
+import org.apache.flink.api.common.typeutils.base.InstantSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
@@ -53,6 +55,7 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
@@ -83,6 +86,8 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
public static final BasicTypeInfo<BigInteger> BIG_INT_TYPE_INFO = new BasicTypeInfo<>(BigInteger.class, new Class<?>[]{}, BigIntSerializer.INSTANCE, BigIntComparator.class);
public static final BasicTypeInfo<BigDecimal> BIG_DEC_TYPE_INFO = new BasicTypeInfo<>(BigDecimal.class, new Class<?>[]{}, BigDecSerializer.INSTANCE, BigDecComparator.class);
+ public static final BasicTypeInfo<Instant> INSTANT_TYPE_INFO = new BasicTypeInfo<>(Instant.class, new Class<?>[]{}, InstantSerializer.INSTANCE, InstantComparator.class);
+
// --------------------------------------------------------------------------------------------
@@ -250,5 +255,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
TYPES.put(void.class, VOID_TYPE_INFO);
TYPES.put(BigInteger.class, BIG_INT_TYPE_INFO);
TYPES.put(BigDecimal.class, BIG_DEC_TYPE_INFO);
+ TYPES.put(Instant.class, INSTANT_TYPE_INFO);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index 0140e9c..8e7538a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -43,6 +43,7 @@ import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -150,6 +151,12 @@ public class Types {
*/
public static final TypeInformation<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP;
+
+ /**
+ * Returns type infomation for {@link java.time.Instant}. Supports a null value.
+ */
+ public static final TypeInformation<Instant> INSTANT = BasicTypeInfo.INSTANT_TYPE_INFO;
+
//CHECKSTYLE.OFF: MethodName
/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
new file mode 100644
index 0000000..28a7be0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Comparator for comparing Java Instant.
+ */
+@Internal
+public final class InstantComparator extends BasicTypeComparator<Instant>{
+
+ private static final long serialVersionUID = 1L;
+ private static final long SECONDS_MIN_VALUE = Instant.MIN.getEpochSecond();
+
+ public InstantComparator(boolean ascending) {
+ super(ascending);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ final long lSeconds = firstSource.readLong();
+ final long rSeconds = secondSource.readLong();
+ final int comp;
+ if (lSeconds == rSeconds) {
+ final int lNanos = firstSource.readInt();
+ final int rNanos = secondSource.readInt();
+ comp = (lNanos < rNanos ? -1 : (lNanos == rNanos ? 0 : 1));
+ } else {
+ comp = lSeconds < rSeconds ? -1 : 1;
+ }
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return InstantSerializer.SECONDS_BYTES + InstantSerializer.NANOS_BYTES;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < getNormalizeKeyLen();
+ }
+
+ @Override
+ public void putNormalizedKey(Instant record, MemorySegment target, int offset, int numBytes) {
+ final int secondsBytes = InstantSerializer.SECONDS_BYTES;
+ final long normalizedSeconds = record.getEpochSecond() - SECONDS_MIN_VALUE;
+ if (numBytes >= secondsBytes) {
+ target.putLongBigEndian(offset, normalizedSeconds);
+ offset += secondsBytes;
+ numBytes -= secondsBytes;
+
+ final int nanosBytes = InstantSerializer.NANOS_BYTES;
+ if (numBytes >= nanosBytes) {
+ target.putIntBigEndian(offset, record.getNano());
+ offset += nanosBytes;
+ numBytes -= nanosBytes;
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ } else {
+ final int nanos = record.getNano();
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) (nanos >>> ((3 - i) << 3)));
+ }
+ }
+ } else {
+ for (int i = 0; i < numBytes; i++) {
+ target.put(offset + i, (byte) (normalizedSeconds >>> ((7 - i) << 3)));
+ }
+ }
+ }
+
+ @Override
+ public TypeComparator<Instant> duplicate() {
+ return new InstantComparator(ascendingComparison);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
new file mode 100644
index 0000000..2476ef6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Serializer for serializing/deserializing Instant values including null values.
+ */
+@Internal
+public final class InstantSerializer extends TypeSerializerSingleton<Instant> {
+ static final int SECONDS_BYTES = Long.BYTES;
+ static final int NANOS_BYTES = Integer.BYTES;
+
+ private static final long NULL_SECONDS = Long.MIN_VALUE;
+ //Nanos of normal Instant is between 0 and 999,999,999,
+ //therefore we can use Integer.MIN_VALUE to represent NULL Instant
+ //regardless supported range of seconds
+ private static final int NULL_NANOS = Integer.MIN_VALUE;
+
+ public static final InstantSerializer INSTANCE = new InstantSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public Instant createInstance() {
+ return Instant.EPOCH;
+ }
+
+ @Override
+ public Instant copy(Instant from) {
+ return from;
+ }
+
+ @Override
+ public Instant copy(Instant from, Instant reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return SECONDS_BYTES + NANOS_BYTES;
+ }
+
+ @Override
+ public void serialize(Instant record, DataOutputView target) throws IOException {
+ if (record == null) {
+ target.writeLong(NULL_SECONDS);
+ target.writeInt(NULL_NANOS);
+ } else {
+ target.writeLong(record.getEpochSecond());
+ target.writeInt(record.getNano());
+ }
+ }
+
+ @Override
+ public Instant deserialize(DataInputView source) throws IOException {
+ final long seconds = source.readLong();
+ final int nanos = source.readInt();
+ if (seconds == NULL_SECONDS && nanos == NULL_NANOS) {
+ return null;
+ }
+ return Instant.ofEpochSecond(seconds, nanos);
+ }
+
+ @Override
+ public Instant deserialize(Instant reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeLong(source.readLong());
+ target.writeInt(source.readInt());
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof InstantSerializer;
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
new file mode 100644
index 0000000..010df04
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.Instant;
+
+/**
+ * A test for the {@link InstantComparator}.
+ */
+public class InstantComparatorTest extends ComparatorTestBase<Instant> {
+
+ @Override
+ protected TypeComparator<Instant> createComparator(boolean ascending) {
+ return new InstantComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<Instant> createSerializer() {
+ return new InstantSerializer();
+ }
+
+ @Override
+ protected Instant[] getSortedTestData() {
+ return new Instant[] {
+ Instant.EPOCH,
+ Instant.parse("1970-01-01T00:00:00.001Z"),
+ Instant.parse("1990-10-14T02:42:25.123Z"),
+ Instant.parse("1990-10-14T02:42:25.123000001Z"),
+ Instant.parse("1990-10-14T02:42:25.123000002Z"),
+ Instant.parse("2013-08-12T14:15:59.478Z"),
+ Instant.parse("2013-08-12T14:15:59.479Z"),
+ Instant.parse("2040-05-12T18:00:45.999Z"),
+ Instant.MAX
+ };
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
new file mode 100644
index 0000000..7c14c69
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * A test for the {@link InstantSerializer}.
+ */
+public class InstantSerializerTest extends SerializerTestBase<Instant> {
+ @Override
+ protected TypeSerializer<Instant> createSerializer() {
+ return new InstantSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 12;
+ }
+
+ @Override
+ protected Class<Instant> getTypeClass() {
+ return Instant.class;
+ }
+
+
+ private static long rndSeconds(Random rnd) {
+ return (long) (Instant.MIN.getEpochSecond()
+ + rnd.nextDouble() * (Instant.MAX.getEpochSecond() - Instant.MIN.getEpochSecond()));
+ }
+
+ private static int rndNanos(Random rnd) {
+ return (int) (rnd.nextDouble() * 999999999);
+ }
+
+ @Override
+ protected Instant[] getTestData() {
+ final Random rnd = new Random(874597969123412341L);
+
+ return new Instant[] {
+ Instant.EPOCH, Instant.MIN, Instant.MAX,
+ Instant.ofEpochSecond(rndSeconds(rnd), rndNanos(rnd)),
+ Instant.ofEpochSecond(1534135584,949495),
+ Instant.ofEpochSecond(56090783)
+ };
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
index 5707701..bbd75d7 100644
--- a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.time.Instant;
import java.util.Date;
import static org.junit.Assert.assertEquals;
@@ -33,7 +34,7 @@ public class BasicTypeInfoTest extends TestLogger {
static Class<?>[] classes = {String.class, Integer.class, Boolean.class, Byte.class,
Short.class, Long.class, Float.class, Double.class, Character.class, Date.class,
- Void.class, BigInteger.class, BigDecimal.class};
+ Void.class, BigInteger.class, BigDecimal.class, Instant.class};
@Test
public void testBasicTypeInfoEquality() {
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
index 4ce9b0f..a4ec6ed 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -157,6 +157,11 @@ object Types {
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
/**
+ * Returns type information for [[java.time.Instant]]. Supports a null value.
+ */
+ val INSTANT: TypeInformation[java.time.Instant] = JTypes.INSTANT
+
+ /**
* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
* A row itself must not be null.
*