You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/01 08:50:09 UTC

[GitHub] [iceberg] openinx opened a new pull request #2285: Flink: Support bucket table.

openinx opened a new pull request #2285:
URL: https://github.com/apache/iceberg/pull/2285


   As we apache flink does not support bucket or hidden partition syntax, so we add a flink table property in iceberg table, so that we could bucket the table into specified buckets: 
   
   ```java
   CREATE TABLE test_iceberg(
      id  INT,
      data STRING
   ) WITH (
      'flink.partitions'='id=bucket[16];data=bucket[8]'
   )
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2285: Flink: Support bucket table.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2285:
URL: https://github.com/apache/iceberg/pull/2285#issuecomment-789384685


   @yyanyy  thanks for the checking,  actually  it's a preview PR for myself github repo (https://github.com/openinx/incubator-iceberg),  I pull request the patch to the apache official repo by mistake.   I create this patch because I want to know what's the specific work that we need to support bucket table in flink side.   The table property solution ( to define the buckets) is not the final solution for flink sql,  because it seems hard to accomplish the flink runtime optimization when defining the bucket policy in table property.  The flink team are considering to provide a `BUCKET` SQL clause to make the bucket feature work.   I will close this PR, thanks. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #2285: Flink: Support bucket table.

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #2285:
URL: https://github.com/apache/iceberg/pull/2285#discussion_r585840378



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -361,25 +362,27 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     validateFlinkTable(table);
 
     Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
-    PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
+    PartitionSpec.Builder specBuilder = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
-    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
     String location = null;
+    ImmutableMap.Builder<String, String> propsBuilder = ImmutableMap.builder();

Review comment:
       Nit: seems like an unnecessary change (rename)

##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionUtil.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+
+class PartitionUtil {
+
+  private PartitionUtil() {
+  }
+
+  static String partitionPropKey(String colName) {
+    return String.format("%s%s", FlinkTableProperties.PARTITION_BY_PREFIX, colName);
+  }
+
+  static boolean isPartitionField(String propKey) {
+    return propKey != null && propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX);
+  }
+
+  private static String partitionField(String propKey) {
+    if (propKey != null && propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX)) {
+      return propKey.replace(FlinkTableProperties.PARTITION_BY_PREFIX, "");
+    } else {
+      return null;
+    }
+  }
+
+  static void addPartitionField(Schema schema, PartitionSpec.Builder builder, String propKey, String propValue) {
+    String colName = partitionField(propKey);
+    Preconditions.checkNotNull(colName,
+        "Table property '%s' is not an valid partition field property, please use '%s<column-name>'", propKey,
+        FlinkTableProperties.PARTITION_BY_PREFIX);
+
+    Types.NestedField nestedField = schema.findField(colName);
+    Preconditions.checkNotNull(nestedField, "Cannot find field %s in schema: %s", colName, schema);
+
+    Transform<?, ?> transform = Transforms.fromString(nestedField.type(), propValue);
+    String transformString = transform.toString();
+    switch (transformType(transformString)) {
+      case "bucket":
+        builder.bucket(colName, findWidth(transformString));
+        break;
+
+      case "truncate":
+        builder.truncate(colName, findWidth(transformString));
+        break;
+
+      case "identify":
+        builder.identity(colName);
+        break;
+
+      case "year":
+        builder.year(colName);
+        break;
+
+      case "month":
+        builder.month(colName);
+        break;
+
+      case "day":
+        builder.day(colName);
+        break;
+
+      case "hour":
+        builder.hour(colName);
+        break;
+
+      case "void":
+        builder.alwaysNull(colName);
+        break;
+
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown transform %s for field %s", transformString, colName));
+    }
+  }
+
+  static Term toIcebergTerm(String colName, Transform<?, ?> transform) {
+    String transformString = transform.toString();
+    switch (transformType(transformString)) {
+      case "bucket":
+        return Expressions.bucket(colName, findWidth(transformString));
+
+      case "truncate":
+        return Expressions.truncate(colName, findWidth(transformString));
+
+      case "identify":

Review comment:
       you mean "identity"? both here and L74, L139; might also want test coverage for this class to catch similar issue?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -471,10 +477,11 @@ private static void validateFlinkTable(CatalogBaseTable table) {
     }
   }
 
-  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
+  private static PartitionSpec.Builder toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {

Review comment:
       nit: wondering if we want to rename this to `toPartitionSpecBuilder`

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -573,7 +597,7 @@ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partit
 
   @Override
   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition,
-      boolean ignoreIfExists) throws CatalogException {
+                              boolean ignoreIfExists) throws CatalogException {

Review comment:
       Nit: seems like unnecessary changes, same as below

##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionUtil.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+
+class PartitionUtil {
+
+  private PartitionUtil() {
+  }
+
+  static String partitionPropKey(String colName) {

Review comment:
       nit: this looks like only being used by test, might be able to move to test?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -518,6 +526,22 @@ private static void commitChanges(Table table, String setLocation, String setSna
           .commit();
     }
 
+    if (newSpec != null) {

Review comment:
       looks like `newSpec` is always not null after this change, which means this transaction is always triggered. Do we want to compare this with the old spec and if they are the same, we can skip this transaction? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionUtil.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+
+class PartitionUtil {
+
+  private PartitionUtil() {
+  }
+
+  static String partitionPropKey(String colName) {
+    return String.format("%s%s", FlinkTableProperties.PARTITION_BY_PREFIX, colName);
+  }
+
+  static boolean isPartitionField(String propKey) {
+    return propKey != null && propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX);
+  }
+
+  private static String partitionField(String propKey) {
+    if (propKey != null && propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX)) {
+      return propKey.replace(FlinkTableProperties.PARTITION_BY_PREFIX, "");
+    } else {
+      return null;
+    }
+  }
+
+  static void addPartitionField(Schema schema, PartitionSpec.Builder builder, String propKey, String propValue) {
+    String colName = partitionField(propKey);
+    Preconditions.checkNotNull(colName,
+        "Table property '%s' is not an valid partition field property, please use '%s<column-name>'", propKey,
+        FlinkTableProperties.PARTITION_BY_PREFIX);
+
+    Types.NestedField nestedField = schema.findField(colName);
+    Preconditions.checkNotNull(nestedField, "Cannot find field %s in schema: %s", colName, schema);
+
+    Transform<?, ?> transform = Transforms.fromString(nestedField.type(), propValue);

Review comment:
       can we directly check `transform instanceOf` and do the parse and assign for each type? I think it would be safer than string comparison




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed pull request #2285: Flink: Support bucket table.

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #2285:
URL: https://github.com/apache/iceberg/pull/2285


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org