You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/03/08 22:56:00 UTC

[jira] [Work logged] (GOBBLIN-1779) Add configuration to toggle topics with complex union types in batch compaction

     [ https://issues.apache.org/jira/browse/GOBBLIN-1779?focusedWorklogId=849928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-849928 ]

ASF GitHub Bot logged work on GOBBLIN-1779:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Mar/23 22:55
            Start Date: 08/Mar/23 22:55
    Worklog Time Spent: 10m 
      Work Description: vikrambohra commented on code in PR #3648:
URL: https://github.com/apache/gobblin/pull/3648#discussion_r1130157270


##########
gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.function;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Alternative to {@link Predicate} that handles wrapping (or tunneling) a single checked {@link Exception} derived class.
+ * Based on and extremely similar to {@link CheckedExceptionFunction}.<br><br>
+ *
+ * At first glance, it appears these 2 classes could be generalized and combined without the uncomfortable amount of duplication.
+ * But this is not possible to do cleanly because:
+ *   <ul>
+ *     <li> {@link Predicate} and {@link Function} are separate types with no inheritance hierarchy relationship</li>
+ *     <li>
+ *       {@link CheckedExceptionPredicate#wrapToUnchecked(CheckedExceptionPredicate)} returns a {@link Predicate}
+ *       but {@link CheckedExceptionFunction#wrapToUnchecked(CheckedExceptionFunction)} returns a {@link Function}. And
+ *       since Java does not support higher level generics / type classes (i.e. type parameters for types that are
+ *       themselves parameterized)
+ *     </li>
+ *   </ul>
+ */
+@FunctionalInterface
+public interface CheckedExceptionPredicate<T, E extends Exception> {

Review Comment:
   Overall good thought. Should we consider existing Predicate interfaces in guava or other util libraries?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import com.google.common.base.Optional;
+import gobblin.configuration.State;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * Determines if a dataset's hive schema contains a non optional union
+ */
+@Slf4j
+public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset> implements CheckedExceptionPredicate<T, IOException> {
+  private HiveRegister hiveRegister;

Review Comment:
   make final 



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A decorator for filtering datasets after a {@link DatasetsFinder} finds a {@link List} of {@link Dataset}s
+ */
+public class DatasetsFinderFilteringDecorator<T extends Dataset> implements DatasetsFinder<T> {
+  private static final String PREFIX = "filtering.datasets.finder.";
+  public static final String DATASET_CLASS = PREFIX + "class";
+  public static final String ALLOWED = PREFIX + "allowed.predicates";
+  public static final String DENIED = PREFIX + "denied.predicates";
+
+  protected DatasetsFinder<T> datasetFinder;
+  protected List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates;
+  protected List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates;
+
+  public DatasetsFinderFilteringDecorator(FileSystem fs, Properties properties) throws IOException {
+    this.datasetFinder = DatasetUtils.instantiateDatasetFinder(
+        DATASET_CLASS, properties, fs, DefaultFileSystemGlobFinder.class.getName());
+    this.allowDatasetPredicates = instantiatePredicates(ALLOWED, properties);
+    this.denyDatasetPredicates = instantiatePredicates(DENIED, properties);
+  }
+
+  @VisibleForTesting
+  DatasetsFinderFilteringDecorator(
+      DatasetsFinder<T> datasetsFinder,
+      List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates,
+      List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates) {
+    this.datasetFinder = datasetsFinder;
+    this.allowDatasetPredicates = allowDatasetPredicates;
+    this.denyDatasetPredicates = denyDatasetPredicates;
+  }
+
+  @Override
+  public List<T> findDatasets() throws IOException {
+    List<T> datasets = datasetFinder.findDatasets();
+    List<T> allowedDatasets = null;

Review Comment:
   instantiate with Empty list to avoid NPE



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A decorator for filtering datasets after a {@link DatasetsFinder} finds a {@link List} of {@link Dataset}s
+ */
+public class DatasetsFinderFilteringDecorator<T extends Dataset> implements DatasetsFinder<T> {
+  private static final String PREFIX = "filtering.datasets.finder.";
+  public static final String DATASET_CLASS = PREFIX + "class";
+  public static final String ALLOWED = PREFIX + "allowed.predicates";
+  public static final String DENIED = PREFIX + "denied.predicates";
+
+  protected DatasetsFinder<T> datasetFinder;
+  protected List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates;
+  protected List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates;
+
+  public DatasetsFinderFilteringDecorator(FileSystem fs, Properties properties) throws IOException {
+    this.datasetFinder = DatasetUtils.instantiateDatasetFinder(
+        DATASET_CLASS, properties, fs, DefaultFileSystemGlobFinder.class.getName());
+    this.allowDatasetPredicates = instantiatePredicates(ALLOWED, properties);
+    this.denyDatasetPredicates = instantiatePredicates(DENIED, properties);
+  }
+
+  @VisibleForTesting
+  DatasetsFinderFilteringDecorator(
+      DatasetsFinder<T> datasetsFinder,
+      List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates,
+      List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates) {
+    this.datasetFinder = datasetsFinder;
+    this.allowDatasetPredicates = allowDatasetPredicates;
+    this.denyDatasetPredicates = denyDatasetPredicates;
+  }
+
+  @Override
+  public List<T> findDatasets() throws IOException {
+    List<T> datasets = datasetFinder.findDatasets();
+    List<T> allowedDatasets = null;
+    try {
+      allowedDatasets = datasets.parallelStream()
+          .filter(dataset -> allowDatasetPredicates.stream()
+              .map(CheckedExceptionPredicate::wrapToTunneled)
+              .allMatch(p -> p.test(dataset)))
+          .filter(dataset -> denyDatasetPredicates.stream()
+              .map(CheckedExceptionPredicate::wrapToTunneled)
+              .noneMatch(predicate -> predicate.test(dataset)))
+          .collect(Collectors.toList());
+    } catch (CheckedExceptionPredicate.WrappedIOException wrappedIOException) {
+      wrappedIOException.rethrowWrapped();
+    }
+
+    return allowedDatasets;
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return datasetFinder.commonDatasetRoot();
+  }
+
+  private List<CheckedExceptionPredicate<T,IOException>> instantiatePredicates(String key, Properties props)
+      throws IOException {
+    List<CheckedExceptionPredicate<T,IOException>> predicates = new ArrayList<>();
+    try {
+      for (String className : PropertiesUtils.getPropAsList(props, key)) {
+        predicates.add((CheckedExceptionPredicate<T, IOException>)
+            GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), props));

Review Comment:
   Can we call the exact constructor? invoke longest constructor is not very clean 



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import com.google.common.base.Optional;
+import gobblin.configuration.State;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * Determines if a dataset's hive schema contains a non optional union
+ */
+@Slf4j
+public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset> implements CheckedExceptionPredicate<T, IOException> {
+  private HiveRegister hiveRegister;
+  private Pattern pattern;

Review Comment:
   make final





Issue Time Tracking
-------------------

    Worklog Id:     (was: 849928)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add configuration to toggle topics with complex union types in batch compaction 
> --------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1779
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1779
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)