You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/13 04:56:37 UTC

[GitHub] [iceberg] zhangjun0x01 opened a new pull request #2082: Flink: add show partitions with specified partitions

zhangjun0x01 opened a new pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082


   query the iceberg partitions by specified partitions.
   like hive sql `SHOW PARTITIONS table_name PARTITION(ds='2010-03-03');  `
   now `SHOW PARTITIONS` is not supported by flink. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#issuecomment-927251431


   > @zhangjun0x01 , are you still working on it? If not or you do not have time to fix it, I would like to pick it up.
   
   Sorry, I'm busy with other things recently. I don't have time to deal with iceberg. You can deal with this pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556322659



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,
+                                                                       CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());
+      // Long,Map,Struct,List type are not supported by flink,so we do not add them  here.
+      if (type instanceof Types.IntegerType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Integer.valueOf(value)));
+      } else if (type instanceof Types.StringType) {
+        filter = Expressions.and(filter, Expressions.equal(name, value));
+      } else if (type instanceof Types.DoubleType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Double.valueOf(value)));
+      } else if (type instanceof Types.FloatType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Float.valueOf(value)));
+      } else if (type instanceof Types.DateType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.daysFromDate(LocalDate.parse(value))));
+      } else if (type instanceof Types.TimeType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.microsFromTime(LocalTime.parse(value))));
+      } else if (type instanceof Types.TimestampType) {
+        filter = Expressions
+            .and(filter, Expressions.equal(name, DateTimeUtil.microsFromTimestamp(LocalDateTime.parse(value))));
+      } else if (type instanceof Types.BooleanType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Boolean.valueOf(value)));
+      } else if (type instanceof Types.DecimalType) {
+        filter = Expressions.and(filter, Expressions.equal(name, new BigDecimal(value)));
+      }
+    }

Review comment:
       > Also, to make it simpler, you could pull the `filter = Expressions.and(filter,...)` value out and then just have a function that returns the converted value. This way we don't need to repreat the `Expressions.and` portions so many times.
   
   yes,I updated it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556271650



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,
+                                                                       CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());
+      // Long,Map,Struct,List type are not supported by flink,so we do not add them  here.
+      if (type instanceof Types.IntegerType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Integer.valueOf(value)));
+      } else if (type instanceof Types.StringType) {
+        filter = Expressions.and(filter, Expressions.equal(name, value));
+      } else if (type instanceof Types.DoubleType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Double.valueOf(value)));
+      } else if (type instanceof Types.FloatType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Float.valueOf(value)));
+      } else if (type instanceof Types.DateType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.daysFromDate(LocalDate.parse(value))));
+      } else if (type instanceof Types.TimeType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.microsFromTime(LocalTime.parse(value))));
+      } else if (type instanceof Types.TimestampType) {
+        filter = Expressions
+            .and(filter, Expressions.equal(name, DateTimeUtil.microsFromTimestamp(LocalDateTime.parse(value))));
+      } else if (type instanceof Types.BooleanType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Boolean.valueOf(value)));
+      } else if (type instanceof Types.DecimalType) {
+        filter = Expressions.and(filter, Expressions.equal(name, new BigDecimal(value)));
+      }
+    }

Review comment:
       Also, to make it simpler, you could pull the `filter = Expressions.and(filter,...)` value out and then just have a function that returns the converted value. This way we don't need to repreat the `Expressions.and` portions so many times.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556265710



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,
+                                                                       CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());

Review comment:
       Nit: `entry.getKey()` is already named here as `name` so consider using that instead.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,
+                                                                       CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());
+      // Long,Map,Struct,List type are not supported by flink,so we do not add them  here.
+      if (type instanceof Types.IntegerType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Integer.valueOf(value)));
+      } else if (type instanceof Types.StringType) {
+        filter = Expressions.and(filter, Expressions.equal(name, value));
+      } else if (type instanceof Types.DoubleType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Double.valueOf(value)));
+      } else if (type instanceof Types.FloatType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Float.valueOf(value)));
+      } else if (type instanceof Types.DateType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.daysFromDate(LocalDate.parse(value))));
+      } else if (type instanceof Types.TimeType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.microsFromTime(LocalTime.parse(value))));
+      } else if (type instanceof Types.TimestampType) {
+        filter = Expressions
+            .and(filter, Expressions.equal(name, DateTimeUtil.microsFromTimestamp(LocalDateTime.parse(value))));
+      } else if (type instanceof Types.BooleanType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Boolean.valueOf(value)));
+      } else if (type instanceof Types.DecimalType) {
+        filter = Expressions.and(filter, Expressions.equal(name, new BigDecimal(value)));
+      }
+    }

Review comment:
       Perhaps this could be made simpler by using a `Map<class ? extends Type, Function<String, PrimitiveType>>` and then handling the non-PrimitiveType values that require special handling using if statements? Inspiration could be found in this file, for both the map as well as a way to handle `convertLiteral`: https://github.com/apache/iceberg/blob/04e73deb7d68e3c4011101384f725abb1aae6236/spark3/src/main/java/org/apache/iceberg/spark/SparkFilters.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556326048



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,
+                                                                       CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());
+      // Long,Map,Struct,List type are not supported by flink,so we do not add them  here.
+      if (type instanceof Types.IntegerType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Integer.valueOf(value)));
+      } else if (type instanceof Types.StringType) {
+        filter = Expressions.and(filter, Expressions.equal(name, value));
+      } else if (type instanceof Types.DoubleType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Double.valueOf(value)));
+      } else if (type instanceof Types.FloatType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Float.valueOf(value)));
+      } else if (type instanceof Types.DateType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.daysFromDate(LocalDate.parse(value))));
+      } else if (type instanceof Types.TimeType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, DateTimeUtil.microsFromTime(LocalTime.parse(value))));
+      } else if (type instanceof Types.TimestampType) {
+        filter = Expressions
+            .and(filter, Expressions.equal(name, DateTimeUtil.microsFromTimestamp(LocalDateTime.parse(value))));
+      } else if (type instanceof Types.BooleanType) {
+        filter = Expressions.and(filter, Expressions.equal(name, Boolean.valueOf(value)));
+      } else if (type instanceof Types.DecimalType) {
+        filter = Expressions.and(filter, Expressions.equal(name, new BigDecimal(value)));
+      }
+    }

Review comment:
       > Perhaps this could be made simpler by using a `Map<class ? extends Type, Function<String, PrimitiveType>>` and then handling the non-PrimitiveType values that require special handling using if statements? Inspiration could be found in this file, for both the map as well as a way to handle `convertLiteral`: https://github.com/apache/iceberg/blob/04e73deb7d68e3c4011101384f725abb1aae6236/spark3/src/main/java/org/apache/iceberg/spark/SparkFilters.java
   
   The partition value provided by flink is of string type. We need to convert the value to the  iceberg's corresponding type. We can extract a method, but I think in the method we still need to use if to make judgments and convert them in turn.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556265014



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,

Review comment:
       Nit: It seems that the currently imported `org.apache.flink.table.expressions.Expression` is only used one time. It would possibly make sense to import `org.apache.iceberg.expressions.Expression` instead and then use the fully qualified name for the location where `org.apache.flink.table.expressions.Expression` is used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556322299



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,

Review comment:
       yes, I udpated it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556322442



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema schema,
+                                                                       CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());

Review comment:
       updated 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Flyangz commented on pull request #2082: Flink: add show partitions with specified partitions

Posted by GitBox <gi...@apache.org>.
Flyangz commented on pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#issuecomment-927219793


   @zhangjun0x01 , are you still working on it? If not or you do not have time to fix it, I would like to pick it up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org