You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/15 19:02:01 UTC

[GitHub] [iceberg] openinx opened a new pull request #1175: Flink: Add wrapper to adapt Row to StructLike

openinx opened a new pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175


   This patch abstract the common codes of PartitionKey to the newly introduced class `BasePartitionKey`, and both spark `PartitionKey` and flink `PartitionKey` will extend this base class.  I also provide the unit tests for flink `PartitionKey`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-658954992


   @openinx, looks like this still needs the test cases from `TestPartitionValues`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r455279084



##########
File path: flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.lang.reflect.Array;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class RowWrapper implements StructLike {
+
+  private final Type[] types;
+  private final PositionalGetter[] getters;
+  private Row row = null;
+
+  public RowWrapper(Types.StructType type) {
+    int size = type.fields().size();
+
+    types = (Type[]) Array.newInstance(Type.class, size);
+    for (int i = 0; i < size; i++) {
+      types[i] = type.fields().get(i).type();
+    }
+
+    getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);
+    for (int i = 0; i < size; i++) {
+      getters[i] = buildGetter(types[i]);
+    }
+  }
+
+  RowWrapper wrap(Row data) {
+    this.row = data;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    if (row.getField(pos) == null) {
+      return null;
+    } else if (getters[pos] != null) {
+      return javaClass.cast(getters[pos].get(row, pos));
+    }
+
+    return javaClass.cast(row.getField(pos));
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    row.setField(pos, value);
+  }
+
+  private interface PositionalGetter<T> {
+    T get(Row row, int pos);
+  }
+
+  private static PositionalGetter buildGetter(Type type) {
+    if (type instanceof Types.StructType) {

Review comment:
       Yes, this needs to convert to the representation that internal classes use.
   
   Iceberg's generic data model is intended for passing data to and from Java applications, which is why they use friendlier classes. It is up to data models like Iceberg generics or Flink's data model to convert to that representation. Iceberg core should modify data as little as possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1175: Flink: add flink row PartitionKey.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r454509064



##########
File path: flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.lang.reflect.Array;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class RowWrapper implements StructLike {
+
+  private final Type[] types;
+  private final PositionalGetter[] getters;
+  private Row row = null;
+
+  public RowWrapper(Types.StructType type) {
+    int size = type.fields().size();
+
+    types = (Type[]) Array.newInstance(Type.class, size);
+    for (int i = 0; i < size; i++) {
+      types[i] = type.fields().get(i).type();
+    }
+
+    getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);
+    for (int i = 0; i < size; i++) {
+      getters[i] = buildGetter(types[i]);
+    }
+  }
+
+  RowWrapper wrap(Row data) {
+    this.row = data;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    if (row.getField(pos) == null) {
+      return null;
+    } else if (getters[pos] != null) {
+      return javaClass.cast(getters[pos].get(row, pos));
+    }
+
+    return javaClass.cast(row.getField(pos));
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    row.setField(pos, value);
+  }
+
+  private interface PositionalGetter<T> {
+    T get(Row row, int pos);
+  }
+
+  private static PositionalGetter buildGetter(Type type) {
+    if (type instanceof Types.StructType) {

Review comment:
       The objects returned by this wrapper need to be Iceberg's internal representation:
   * `int` for `DateType`: number of days from epoch
   * `long` for `TimeType`: number of microseconds from midnight
   * `long` for both `TimestampType`: number of microseconds from epoch
   * `ByteBuffer` for both `fixed(L)` and `binary` types
   * `BigDecimal` for `decimal(P, S)`
   
   Because we Flink uses the same in-memory representation as Iceberg generics, this should use the [same conversions](https://github.com/apache/iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java#L46-L64) that we use for `Record`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1175: Generilize the BasePartitionkey to abstract the common codes for spark and flink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-656420938


   @openinx, now that the RC for 0.9.0 is out, I should be able to pick back up on Flink reviews tomorrow. I'll probably start with this one since we need to clean this up. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r454774756



##########
File path: flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.lang.reflect.Array;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class RowWrapper implements StructLike {
+
+  private final Type[] types;
+  private final PositionalGetter[] getters;
+  private Row row = null;
+
+  public RowWrapper(Types.StructType type) {
+    int size = type.fields().size();
+
+    types = (Type[]) Array.newInstance(Type.class, size);
+    for (int i = 0; i < size; i++) {
+      types[i] = type.fields().get(i).type();
+    }
+
+    getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);
+    for (int i = 0; i < size; i++) {
+      getters[i] = buildGetter(types[i]);
+    }
+  }
+
+  RowWrapper wrap(Row data) {
+    this.row = data;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    if (row.getField(pos) == null) {
+      return null;
+    } else if (getters[pos] != null) {
+      return javaClass.cast(getters[pos].get(row, pos));
+    }
+
+    return javaClass.cast(row.getField(pos));
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    row.setField(pos, value);
+  }
+
+  private interface PositionalGetter<T> {
+    T get(Row row, int pos);
+  }
+
+  private static PositionalGetter buildGetter(Type type) {
+    if (type instanceof Types.StructType) {

Review comment:
       Thanks for the details, we discussed about this thing in [here](https://github.com/apache/iceberg/pull/1197#discussion_r454285996), maybe you want to take a look :-) . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue removed a comment on pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
rdblue removed a comment on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-658954992


   @openinx, looks like this still needs the test cases from `TestPartitionValues`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1175: Flink: add flink row PartitionKey.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r454511730



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestPartitionKey.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPartitionKey {

Review comment:
       Can you add a test based on Spark's `TestPartitionValues`? That tests every supported type, null values, and different column orders.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-658676419


   Addressed all the comments, Pls take another look, thanks @rdblue .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1175: Flink: add flink row PartitionKey.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r454512523



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestPartitionKey.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPartitionKey {
+
+  @Test
+  public void testSimplePartition() {
+    Schema schema = new Schema(
+        Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "address", Types.StringType.get())
+    );
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema)
+        .identity("address")
+        .build();
+    RowWrapper rowWrapper = new RowWrapper(schema.asStruct());
+
+    Row row1 = Row.of(101, "hello", "addr-1");
+    PartitionKey partitionKey = new PartitionKey(spec, schema);
+    partitionKey.partition(rowWrapper.wrap(row1));
+    Assert.assertEquals(partitionKey.size(), 1);
+    Assert.assertEquals(partitionKey.get(0, String.class), "addr-1");
+
+    Row row2 = Row.of(102, "world", "addr-2");
+    partitionKey.partition(rowWrapper.wrap(row2));
+    Assert.assertEquals(partitionKey.size(), 1);
+    Assert.assertEquals(partitionKey.get(0, String.class), "addr-2");
+  }
+
+  @Test
+  public void testPartitionWithNestedType() {
+    Schema schema = new Schema(
+        Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+        Types.NestedField.optional(2, "structType", Types.StructType.of(
+            Types.NestedField.optional(3, "innerStringType", Types.StringType.get()),
+            Types.NestedField.optional(4, "innerIntegerType", Types.IntegerType.get())
+        )),
+        Types.NestedField.optional(5, "listType", Types.ListType.ofOptional(6, Types.LongType.get())),
+        Types.NestedField.optional(7, "mapType",
+            Types.MapType.ofRequired(8, 9, Types.IntegerType.get(), Types.StringType.get())),
+        Types.NestedField.required(10, "ts", Types.TimestampType.withZone())
+    );
+    RowWrapper rowWrapper = new RowWrapper(schema.asStruct());
+
+    Row row = Row.of(
+        1001,
+        Row.of("addr-1", 200),
+        new Long[] {101L, 102L},
+        ImmutableMap.of(1001, "1001-value"),
+        DateTimeUtil.microsFromTimestamp(DateTimeUtil.timestampFromMicros(0L))
+    );
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema)
+        .identity("structType.innerStringType")
+        .build();
+    PartitionKey partitionKey = new PartitionKey(spec, schema);
+    partitionKey.partition(rowWrapper.wrap(row));
+    Assert.assertEquals(partitionKey.size(), 1);
+    Assert.assertEquals(partitionKey.get(0, String.class), "addr-1");
+    Assert.assertEquals(partitionKey.toPath(), "structType.innerStringType=addr-1");
+
+    PartitionSpec spec2 = PartitionSpec.builderFor(schema)
+        .identity("structType.innerIntegerType")
+        .build();
+    PartitionKey partitionKey2 = new PartitionKey(spec2, schema);
+    partitionKey2.partition(rowWrapper.wrap(row));
+    Assert.assertEquals(1, partitionKey2.size());
+    Assert.assertEquals(200, (int) partitionKey2.get(0, Integer.class));
+    Assert.assertEquals(partitionKey2.toPath(), "structType.innerIntegerType=200");

Review comment:
       Can you split each of these blocks into a separate test case? There are lots of different cases mixed together in this method. Mixing cases together makes it harder to see what is broken when tests fail because you don't get a picture of what is common across failed cases since many of them don't run.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-658989103


   I reopened this to run tests against master with the CI fix. It's passing tests so I'll merge it. Thanks, @openinx!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1175: Generilize the BasePartitionkey to abstract the common codes for spark and flink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r450827719



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
##########
@@ -176,61 +70,22 @@ public int hashCode() {
     }
   }
 
-  private static Accessor<InternalRow> newAccessor(int position, boolean isOptional, Types.StructType type,
-                                                   Accessor<InternalRow> accessor) {
+  @Override
+  protected Accessor<InternalRow> newAccessor(int position, boolean isOptional, Types.StructType type,
+                                              Accessor<InternalRow> accessor) {
     int size = type.fields().size();
     if (isOptional) {
       // the wrapped position handles null layers
       return new WrappedPositionAccessor(position, size, accessor);
     } else if (accessor.getClass() == PositionAccessor.class) {
       return new Position2Accessor(position, size, (PositionAccessor) accessor);
-    } else if (accessor instanceof Position2Accessor) {
+    } else if (accessor.getClass() == Position2Accessor.class) {

Review comment:
       It's quite confuse for me to see that the above else-if use `accessor.getClass() == PositionAccessor.class` while this line use `accessor instanceof Position2Accessor`.  I see that there's only one Position2Accessor implementation, so changing it from `instanceof`  to `=` should be OK here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1175: Generilize the BasePartitionkey to abstract the common codes for spark and flink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-656919000


   @openinx, I opened an alternative to this PR, #1195. Please take a look.
   
   This solution looks fairly clean for producing a `PartitionKey` for a specific format, but it requires building a subclass of `PartitionKey` for every row representation as well as new `Accessor` classes. I'd like to make it possible to reuse the existing `PartitionKey` class as well as the existing `Accessor` implementations (produced by `Schema.accessorForField(id)`) that are currently used for expression evaluation.
   
   The approach I took in the other PR is to reuse the existing accessors, which accept a `StructLike`. To make that work, I just needed to add a wrapper class that adapts Spark's `InternalRow` to `StructLike`, and that converts Spark objects to Iceberg's internal representation. I think that's going to be a better long-term approach than multiple `PartitionKey` classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1175: Generilize the BasePartitionkey to abstract the common codes for spark and flink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#discussion_r453073671



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
##########
@@ -176,61 +70,22 @@ public int hashCode() {
     }
   }
 
-  private static Accessor<InternalRow> newAccessor(int position, boolean isOptional, Types.StructType type,
-                                                   Accessor<InternalRow> accessor) {
+  @Override
+  protected Accessor<InternalRow> newAccessor(int position, boolean isOptional, Types.StructType type,
+                                              Accessor<InternalRow> accessor) {
     int size = type.fields().size();
     if (isOptional) {
       // the wrapped position handles null layers
       return new WrappedPositionAccessor(position, size, accessor);
     } else if (accessor.getClass() == PositionAccessor.class) {
       return new Position2Accessor(position, size, (PositionAccessor) accessor);
-    } else if (accessor instanceof Position2Accessor) {
+    } else if (accessor.getClass() == Position2Accessor.class) {

Review comment:
       Yes, this should be fine and is also more correct. Originally, both used `instanceof`, but there was a bug when the behavior of `PositionAccessor` was overridden. Replacing a `PositionAccessor` subclass with a `Position2Accessor` removed the subclass's behavior. It is only safe to replace a `PositionAccessor`, not a subclass.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1175: Flink: add flink row PartitionKey.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175#issuecomment-658158027


   Ping @rdblue for reviewing. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue closed pull request #1175: Flink: Add wrapper to adapt Row to StructLike

Posted by GitBox <gi...@apache.org>.
rdblue closed pull request #1175:
URL: https://github.com/apache/iceberg/pull/1175


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org