You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "mark-bathori (via GitHub)" <gi...@apache.org> on 2023/06/29 07:02:16 UTC

[GitHub] [nifi] mark-bathori opened a new pull request, #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

mark-bathori opened a new pull request, #7449:
URL: https://github.com/apache/nifi/pull/7449

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-11334](https://issues.apache.org/jira/browse/NIFI-11334)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [x] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7449:
URL: https://github.com/apache/nifi/pull/7449#discussion_r1248004762


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperties.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg;
+
+public class IcebergCatalogProperties {
+
+    public static final String METASTORE_URI = "metastoreUri";
+    public static final String WAREHOUSE_LOCATION = "warehouseLocation";

Review Comment:
   Yes the Enum is more suitable for this. Since the enum value will be used as the catalogProperty key the "icebergPropertyName" is not needed anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage
URL: https://github.com/apache/nifi/pull/7449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7449:
URL: https://github.com/apache/nifi/pull/7449#discussion_r1246952906


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -117,6 +122,15 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         }
     }
 
+    @Override
+    public String getClassloaderIsolationKey(PropertyContext context) {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            return kerberosUserService.getIdentifier();

Review Comment:
   I would suggest using the same classloader isolation key as the one in the other hadoop related components: kerberos principal.
   ```
                   final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
                   return kerberosUser.getPrincipal();
   ```
   The controller service identifier also works but it may be too restrictive. It creates separate classloaders for controller services having the same principal but they could share the classloader.
   E.g. the user can create a process group with the kerberos service and the iceberg processor in it and then copy it multiple times. Not the best design because the kerberos service should be extracted in the parent process group in this case but I can imagine it happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] krisztina-zsihovszki commented on a diff in pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "krisztina-zsihovszki (via GitHub)" <gi...@apache.org>.
krisztina-zsihovszki commented on code in PR #7449:
URL: https://github.com/apache/nifi/pull/7449#discussion_r1246974701


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.nifi.services.iceberg.IcebergCatalogProperties;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
+
+public class IcebergCatalogFactory {
+
+    private final IcebergCatalogService catalogService;
+
+    public IcebergCatalogFactory(IcebergCatalogService catalogService) {
+        this.catalogService = catalogService;
+    }
+
+    public Catalog create() {
+        return switch (catalogService.getCatalogServiceType()) {

Review Comment:
   Using arrow operator is not supported in Java 8, since this processor is also targeted for 1.x, please use Java 8 compatible syntax. 



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java:
##########
@@ -69,28 +65,43 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
     }
 
-    private HiveCatalog catalog;
-
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
 
         final List<ValidationResult> problems = new ArrayList<>();
-        String configMetastoreUri = null;
-        String configWarehouseLocation = null;
+        boolean configMetastoreUriPresent = false;
+        boolean configWarehouseLocationPresent = false;

Review Comment:
   Not related to the actual change just noticed now that the expression language support related config of the properties can be enchanced.
   
   In my view both METASTORE_URI and WAREHOUSE_LOCATION can use
   
       .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
       
     since flow files are not used when they are evaluated.
     (The same comment applies for WAREHOUSE_PATH in HadoopCatalogService.)
     
     
   
   
   



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java:
##########
@@ -44,24 +55,30 @@ public abstract class AbstractCatalogService extends AbstractControllerService i
             .dynamicallyModifiesClasspath(true)
             .build();
 
-    /**
-     * Loads configuration files from the provided paths.
-     *
-     * @param configFiles list of config file paths separated with comma
-     * @return merged configuration
-     */
-    protected Configuration getConfigurationFromFiles(String configFiles) {
-        final Configuration conf = new Configuration();
-        if (StringUtils.isNotBlank(configFiles)) {
+    protected List<Document> parseConfigFile(String configFiles) {
+        List<Document> documentList = new ArrayList<>();
+        if (configFiles != null && !configFiles.trim().isEmpty()) {
             for (final String configFile : configFiles.split(",")) {
-                conf.addResource(new Path(configFile.trim()));
+                File file = new File(configFile.trim());
+                try (final InputStream fis = new FileInputStream(file);
+                     final InputStream in = new BufferedInputStream(fis)) {
+                    final StandardDocumentProvider documentProvider = new StandardDocumentProvider();
+                    documentList.add(documentProvider.parse(in));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);

Review Comment:
   I'd rather use ProcessException and add some error message as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on PR #7449:
URL: https://github.com/apache/nifi/pull/7449#issuecomment-1614964918

   Thanks for the reviews @turcsanyip @krisztina-zsihovszki.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7449:
URL: https://github.com/apache/nifi/pull/7449#discussion_r1248012824


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -117,6 +122,15 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         }
     }
 
+    @Override
+    public String getClassloaderIsolationKey(PropertyContext context) {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            return kerberosUserService.getIdentifier();

Review Comment:
   Unfortunately the kerberos principal can't be used here since the controller service is not in enabled state when the isolation key is being retrieved so it will return with null.
   As I can see the HDFS processors are using the kerberos principal. It works properly there due the `dynamicallymodifiesclasspath` which is used on the `hadoopConfigPath` property, so the isolation key is going to be evaluated also at processor start. This behaviour is missing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7449: NIFI-11334: PutIceberg processor instance interference due same class loader usage

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7449:
URL: https://github.com/apache/nifi/pull/7449#discussion_r1246952906


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java:
##########
@@ -117,6 +122,15 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         }
     }
 
+    @Override
+    public String getClassloaderIsolationKey(PropertyContext context) {
+        final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            return kerberosUserService.getIdentifier();

Review Comment:
   I would suggest using the same classloader isolation key as the one in the other hadoop related components: kerberos principal.
   ```
                   final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
                   return kerberosUser.getPrincipal();
   ```
   The controller service identifier also works but it may be too restrictive. It creates separate classloaders for controller service having the same principal but they could share the classloader. The user can create a process group with the kerberos service and the iceberg processor in it and then copy it multiple times. Not the best design because the kerberos service should be extracted in the parent process group in this case but I can imagine it happening.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java:
##########
@@ -17,16 +17,18 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
 import org.apache.nifi.controller.ControllerService;
 
+import java.util.Map;
+
 /**
  * Provides a basic connector to Iceberg catalog services.
  */
 public interface IcebergCatalogService extends ControllerService {
 
-    Catalog getCatalog();
+    IcebergCatalogServiceType getCatalogServiceType();
+
+    Map<String, String> getAdditionalParameters();

Review Comment:
   "additionalParameters" are quite generic and the callers also use "properties" for the return value.
   `getCatalogProperties()` would be more descriptive.
   Also, the return type would be `Map<IcebergCatalogProperty, String>` (see the comment about `IcebergCatalogProperty` enum above).



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java:
##########
@@ -17,16 +17,18 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
 import org.apache.nifi.controller.ControllerService;
 
+import java.util.Map;
+
 /**
  * Provides a basic connector to Iceberg catalog services.
  */
 public interface IcebergCatalogService extends ControllerService {
 
-    Catalog getCatalog();
+    IcebergCatalogServiceType getCatalogServiceType();
+
+    Map<String, String> getAdditionalParameters();
 
-    Configuration getConfiguration();
+    String getConfigFiles();

Review Comment:
   It returns a comma separated list of file paths which is not obvious at first. The method could parse the raw property value and return the paths in a list:
   ```
   List<String> getConfigFilePaths();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org