You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2016/05/03 13:00:06 UTC

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/1959

    [FLINK-3856] [core] Create types for java.sql.Date/Time/Timestamp

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    This PR adds java.sql.Date/Time/Timestamp as basic types. I declared them PublicEvolving, therefore I didn't add the types to the documentation. I improved the Date serialization to use Long.MIN_VALUE instead of -1. But it still does not solve FLINK-3858 completely.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink DateTimeTimestamp

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1959.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1959
    
----
commit b294fe3f1988582baf4b1d948d95e1efd5293d80
Author: twalthr <tw...@apache.org>
Date:   2016-05-02T14:31:45Z

    [FLINK-3856] [core] Create types for java.sql.Date/Time/Timestamp

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62054304
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java ---
    @@ -0,0 +1,48 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import java.sql.Timestamp;
    +import org.apache.flink.api.common.typeutils.ComparatorTestBase;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +public class SqlTimestampComparatorTest extends ComparatorTestBase<Timestamp> {
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	protected TypeComparator<Timestamp> createComparator(boolean ascending) {
    +		return (TypeComparator) new SqlTimestampComparator(ascending);
    +	}
    +
    +	@Override
    +	protected TypeSerializer<Timestamp> createSerializer() {
    +		return new SqlTimestampSerializer();
    +	}
    +
    +	@Override
    +	protected Timestamp[] getSortedTestData() {
    +		return new Timestamp[] {
    +			Timestamp.valueOf("1970-01-01 00:00:00.000"),
    --- End diff --
    
    Add Timestamps that only differ in the nanos?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1959#issuecomment-218210159
  
    @twalthr, should `SqlTimeTypeInfo` implement `TypeInformation`? What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62054394
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import java.sql.Timestamp;
    +import org.apache.flink.api.common.typeutils.SerializerTestBase;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +/**
    + * A test for the {@link SqlTimestampSerializer}.
    + */
    +public class SqlTimestampSerializerTest extends SerializerTestBase<Timestamp> {
    +
    +	@Override
    +	protected TypeSerializer<Timestamp> createSerializer() {
    +		return new SqlTimestampSerializer();
    +	}
    +
    +	@Override
    +	protected int getLength() {
    +		return 12;
    +	}
    +
    +	@Override
    +	protected Class<Timestamp> getTypeClass() {
    +		return Timestamp.class;
    +	}
    +
    +	@Override
    +	protected Timestamp[] getTestData() {
    +		return new Timestamp[] {
    +			new Timestamp(0L),
    +			Timestamp.valueOf("1970-01-01 00:00:00.000"),
    --- End diff --
    
    Add a Timestamp with nanos to check their serialization?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62702515
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java ---
    @@ -0,0 +1,177 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeinfo;
    +
    +import java.lang.reflect.Constructor;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.Objects;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.DateComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Type information for Java SQL Date/Time/Timestamp.
    + */
    +@PublicEvolving
    +public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
    --- End diff --
    
    Do we want to implement a new `TypeInformation` which is basically a copy of `BasicTypeInfo`? 
    
    Alternatively we could make `SqlTimeTypeInfo` just a holder for three `public static final BasicTypeInfo`s of the three time types. The `protected` constructor of `BasicTypeInfo` is visible if `SqlTimeTypeInfo` is in the same package as `BasicTypeInfo`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62823533
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java ---
    @@ -0,0 +1,177 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeinfo;
    +
    +import java.lang.reflect.Constructor;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.Objects;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.DateComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Type information for Java SQL Date/Time/Timestamp.
    + */
    +@PublicEvolving
    +public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
    --- End diff --
    
    Ok, I will fix the unused import issues and merge once travis build passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62822604
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java ---
    @@ -0,0 +1,177 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeinfo;
    +
    +import java.lang.reflect.Constructor;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.Objects;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.DateComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Type information for Java SQL Date/Time/Timestamp.
    + */
    +@PublicEvolving
    +public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
    --- End diff --
    
    I also thought about that. But this would also mean that `BasicTypeInfo`'s `getInfoFor` needs to support types that are declared in an other class. So we have to add the classes to the `TYPES` of `BasicTypeInfo`. I think complete separation is a nicer design.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1959#issuecomment-217836281
  
    I will move it to `SqlTimeTypeInfo`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62495413
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java ---
    @@ -59,15 +55,37 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
     	}
     
     	@Override
    -	public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes) {
    -		long value = lValue.getTime() - Long.MIN_VALUE;
    -		
    +	public void putNormalizedKey(Date record, MemorySegment target, int offset, int numBytes) {
    +		putNormalizedKeyDate(record, target, offset, numBytes);
    +	}
    +
    +	@Override
    +	public DateComparator duplicate() {
    +		return new DateComparator(ascendingComparison);
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//                           Static Helpers for Date Comparison
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static int compareSerializedDate(DataInputView firstSource, DataInputView secondSource,
    +			boolean ascendingComparison) throws IOException {
    +		final long l1 = firstSource.readLong();
    +		final long l2 = secondSource.readLong();
    +		final int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	public static void putNormalizedKeyDate(Date record, MemorySegment target, int offset, int numBytes) {
    +		final long value = record.getTime() - Long.MIN_VALUE;
    +
     		// see IntValue for an explanation of the logic
     		if (numBytes == 8) {
     			// default case, full normalized key
     			target.putLongBigEndian(offset, value);
     		}
     		else if (numBytes <= 0) {
    --- End diff --
    
    Thanks for the explanation. You are right. I have changed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1959#issuecomment-216511830
  
    Can these type infos exist independent of the BasicTypeInfo?
    Either in some class like SQL type infos, or even only inside the Table API / SQL project?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1959


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62054682
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java ---
    @@ -59,15 +55,37 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
     	}
     
     	@Override
    -	public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes) {
    -		long value = lValue.getTime() - Long.MIN_VALUE;
    -		
    +	public void putNormalizedKey(Date record, MemorySegment target, int offset, int numBytes) {
    +		putNormalizedKeyDate(record, target, offset, numBytes);
    +	}
    +
    +	@Override
    +	public DateComparator duplicate() {
    +		return new DateComparator(ascendingComparison);
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//                           Static Helpers for Date Comparison
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static int compareSerializedDate(DataInputView firstSource, DataInputView secondSource,
    +			boolean ascendingComparison) throws IOException {
    +		final long l1 = firstSource.readLong();
    +		final long l2 = secondSource.readLong();
    +		final int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	public static void putNormalizedKeyDate(Date record, MemorySegment target, int offset, int numBytes) {
    +		final long value = record.getTime() - Long.MIN_VALUE;
    +
     		// see IntValue for an explanation of the logic
     		if (numBytes == 8) {
     			// default case, full normalized key
     			target.putLongBigEndian(offset, value);
     		}
     		else if (numBytes <= 0) {
    --- End diff --
    
    This case can be removed, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62493248
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java ---
    @@ -59,15 +55,37 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
     	}
     
     	@Override
    -	public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes) {
    -		long value = lValue.getTime() - Long.MIN_VALUE;
    -		
    +	public void putNormalizedKey(Date record, MemorySegment target, int offset, int numBytes) {
    +		putNormalizedKeyDate(record, target, offset, numBytes);
    +	}
    +
    +	@Override
    +	public DateComparator duplicate() {
    +		return new DateComparator(ascendingComparison);
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//                           Static Helpers for Date Comparison
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static int compareSerializedDate(DataInputView firstSource, DataInputView secondSource,
    +			boolean ascendingComparison) throws IOException {
    +		final long l1 = firstSource.readLong();
    +		final long l2 = secondSource.readLong();
    +		final int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	public static void putNormalizedKeyDate(Date record, MemorySegment target, int offset, int numBytes) {
    +		final long value = record.getTime() - Long.MIN_VALUE;
    +
     		// see IntValue for an explanation of the logic
     		if (numBytes == 8) {
     			// default case, full normalized key
     			target.putLongBigEndian(offset, value);
     		}
     		else if (numBytes <= 0) {
    --- End diff --
    
    for `numBytes == 0`, the `numBytes < 8` branch would match and the loop would not be entered, because `numBytes > 0`. So the check is not necessary and only adds overhead for some cases. Actually, `numBytes` should be larger than `0` because otherwise calling this method would not make sense.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62823173
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java ---
    @@ -0,0 +1,177 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeinfo;
    +
    +import java.lang.reflect.Constructor;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.Objects;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.DateComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
    +import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Type information for Java SQL Date/Time/Timestamp.
    + */
    +@PublicEvolving
    +public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
    --- End diff --
    
    I agree. Let's stick to this solution then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1959#issuecomment-216893377
  
    Just a few minor comments. What do you think about moving the types to a `SqlTimeTypeInfo` class, @twalthr? @StephanEwen would that be OK for you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1959#discussion_r62490034
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java ---
    @@ -59,15 +55,37 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
     	}
     
     	@Override
    -	public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes) {
    -		long value = lValue.getTime() - Long.MIN_VALUE;
    -		
    +	public void putNormalizedKey(Date record, MemorySegment target, int offset, int numBytes) {
    +		putNormalizedKeyDate(record, target, offset, numBytes);
    +	}
    +
    +	@Override
    +	public DateComparator duplicate() {
    +		return new DateComparator(ascendingComparison);
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//                           Static Helpers for Date Comparison
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static int compareSerializedDate(DataInputView firstSource, DataInputView secondSource,
    +			boolean ascendingComparison) throws IOException {
    +		final long l1 = firstSource.readLong();
    +		final long l2 = secondSource.readLong();
    +		final int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	public static void putNormalizedKeyDate(Date record, MemorySegment target, int offset, int numBytes) {
    +		final long value = record.getTime() - Long.MIN_VALUE;
    +
     		// see IntValue for an explanation of the logic
     		if (numBytes == 8) {
     			// default case, full normalized key
     			target.putLongBigEndian(offset, value);
     		}
     		else if (numBytes <= 0) {
    --- End diff --
    
    No, this case is for numBytes of 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1959#issuecomment-216513630
  
    These types would also be useful for `flink-jdbc` and possibly other modules. We can move them to a dedicated `TimeTypeInfo` or `SqlTimeTypeInfo` class, but I think they should be part of `flink-core`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1959#issuecomment-218967984
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---