You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/03/08 22:56:00 UTC
[jira] [Work logged] (GOBBLIN-1779) Add configuration to toggle topics with complex union types in batch compaction
[ https://issues.apache.org/jira/browse/GOBBLIN-1779?focusedWorklogId=849928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-849928 ]
ASF GitHub Bot logged work on GOBBLIN-1779:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Mar/23 22:55
Start Date: 08/Mar/23 22:55
Worklog Time Spent: 10m
Work Description: vikrambohra commented on code in PR #3648:
URL: https://github.com/apache/gobblin/pull/3648#discussion_r1130157270
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.function;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Alternative to {@link Predicate} that handles wrapping (or tunneling) a single checked {@link Exception} derived class.
+ * Based on and extremely similar to {@link CheckedExceptionFunction}.<br><br>
+ *
+ * At first glance, it appears these 2 classes could be generalized and combined without the uncomfortable amount of duplication.
+ * But this is not possible to do cleanly because:
+ * <ul>
+ * <li> {@link Predicate} and {@link Function} are separate types with no inheritance hierarchy relationship</li>
+ * <li>
+ * {@link CheckedExceptionPredicate#wrapToUnchecked(CheckedExceptionPredicate)} returns a {@link Predicate}
+ * but {@link CheckedExceptionFunction#wrapToUnchecked(CheckedExceptionFunction)} returns a {@link Function}. And
+ * since Java does not support higher level generics / type classes (i.e. type parameters for types that are
+ * themselves parameterized)
+ * </li>
+ * </ul>
+ */
+@FunctionalInterface
+public interface CheckedExceptionPredicate<T, E extends Exception> {
Review Comment:
Overall good thought. Should we consider existing Predicate interfaces in guava or other util libraries?
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import com.google.common.base.Optional;
+import gobblin.configuration.State;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * Determines if a dataset's hive schema contains a non optional union
+ */
+@Slf4j
+public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset> implements CheckedExceptionPredicate<T, IOException> {
+ private HiveRegister hiveRegister;
Review Comment:
make final
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A decorator for filtering datasets after a {@link DatasetsFinder} finds a {@link List} of {@link Dataset}s
+ */
+public class DatasetsFinderFilteringDecorator<T extends Dataset> implements DatasetsFinder<T> {
+ private static final String PREFIX = "filtering.datasets.finder.";
+ public static final String DATASET_CLASS = PREFIX + "class";
+ public static final String ALLOWED = PREFIX + "allowed.predicates";
+ public static final String DENIED = PREFIX + "denied.predicates";
+
+ protected DatasetsFinder<T> datasetFinder;
+ protected List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates;
+ protected List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates;
+
+ public DatasetsFinderFilteringDecorator(FileSystem fs, Properties properties) throws IOException {
+ this.datasetFinder = DatasetUtils.instantiateDatasetFinder(
+ DATASET_CLASS, properties, fs, DefaultFileSystemGlobFinder.class.getName());
+ this.allowDatasetPredicates = instantiatePredicates(ALLOWED, properties);
+ this.denyDatasetPredicates = instantiatePredicates(DENIED, properties);
+ }
+
+ @VisibleForTesting
+ DatasetsFinderFilteringDecorator(
+ DatasetsFinder<T> datasetsFinder,
+ List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates,
+ List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates) {
+ this.datasetFinder = datasetsFinder;
+ this.allowDatasetPredicates = allowDatasetPredicates;
+ this.denyDatasetPredicates = denyDatasetPredicates;
+ }
+
+ @Override
+ public List<T> findDatasets() throws IOException {
+ List<T> datasets = datasetFinder.findDatasets();
+ List<T> allowedDatasets = null;
Review Comment:
instantiate with Empty list to avoid NPE
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A decorator for filtering datasets after a {@link DatasetsFinder} finds a {@link List} of {@link Dataset}s
+ */
+public class DatasetsFinderFilteringDecorator<T extends Dataset> implements DatasetsFinder<T> {
+ private static final String PREFIX = "filtering.datasets.finder.";
+ public static final String DATASET_CLASS = PREFIX + "class";
+ public static final String ALLOWED = PREFIX + "allowed.predicates";
+ public static final String DENIED = PREFIX + "denied.predicates";
+
+ protected DatasetsFinder<T> datasetFinder;
+ protected List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates;
+ protected List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates;
+
+ public DatasetsFinderFilteringDecorator(FileSystem fs, Properties properties) throws IOException {
+ this.datasetFinder = DatasetUtils.instantiateDatasetFinder(
+ DATASET_CLASS, properties, fs, DefaultFileSystemGlobFinder.class.getName());
+ this.allowDatasetPredicates = instantiatePredicates(ALLOWED, properties);
+ this.denyDatasetPredicates = instantiatePredicates(DENIED, properties);
+ }
+
+ @VisibleForTesting
+ DatasetsFinderFilteringDecorator(
+ DatasetsFinder<T> datasetsFinder,
+ List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates,
+ List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates) {
+ this.datasetFinder = datasetsFinder;
+ this.allowDatasetPredicates = allowDatasetPredicates;
+ this.denyDatasetPredicates = denyDatasetPredicates;
+ }
+
+ @Override
+ public List<T> findDatasets() throws IOException {
+ List<T> datasets = datasetFinder.findDatasets();
+ List<T> allowedDatasets = null;
+ try {
+ allowedDatasets = datasets.parallelStream()
+ .filter(dataset -> allowDatasetPredicates.stream()
+ .map(CheckedExceptionPredicate::wrapToTunneled)
+ .allMatch(p -> p.test(dataset)))
+ .filter(dataset -> denyDatasetPredicates.stream()
+ .map(CheckedExceptionPredicate::wrapToTunneled)
+ .noneMatch(predicate -> predicate.test(dataset)))
+ .collect(Collectors.toList());
+ } catch (CheckedExceptionPredicate.WrappedIOException wrappedIOException) {
+ wrappedIOException.rethrowWrapped();
+ }
+
+ return allowedDatasets;
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
+ return datasetFinder.commonDatasetRoot();
+ }
+
+ private List<CheckedExceptionPredicate<T,IOException>> instantiatePredicates(String key, Properties props)
+ throws IOException {
+ List<CheckedExceptionPredicate<T,IOException>> predicates = new ArrayList<>();
+ try {
+ for (String className : PropertiesUtils.getPropAsList(props, key)) {
+ predicates.add((CheckedExceptionPredicate<T, IOException>)
+ GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), props));
Review Comment:
Can we call the exact constructor? invoke longest constructor is not very clean
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import com.google.common.base.Optional;
+import gobblin.configuration.State;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * Determines if a dataset's hive schema contains a non optional union
+ */
+@Slf4j
+public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset> implements CheckedExceptionPredicate<T, IOException> {
+ private HiveRegister hiveRegister;
+ private Pattern pattern;
Review Comment:
make final
Issue Time Tracking
-------------------
Worklog Id: (was: 849928)
Time Spent: 1h 20m (was: 1h 10m)
> Add configuration to toggle topics with complex union types in batch compaction
> --------------------------------------------------------------------------------
>
> Key: GOBBLIN-1779
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1779
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)