You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ym...@apache.org on 2017/03/29 14:47:07 UTC

nifi git commit: NIFI-3613: Fixed threading bug in JoltTransformJSON and adding caching of compiled Transforms

Repository: nifi
Updated Branches:
  refs/heads/master 168fc72bb -> 83fa24b68


NIFI-3613: Fixed threading bug in JoltTransformJSON and adding caching of compiled Transforms

Signed-off-by: Yolanda M. Davis <ym...@apache.org>

This closes #1630


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/83fa24b6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/83fa24b6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/83fa24b6

Branch: refs/heads/master
Commit: 83fa24b68f74843cc609bbc113ba930f847e4025
Parents: 168fc72
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Mar 28 15:52:32 2017 -0400
Committer: Yolanda M. Davis <ym...@apache.org>
Committed: Wed Mar 29 10:45:34 2017 -0400

----------------------------------------------------------------------
 .../processors/standard/JoltTransformJSON.java  | 182 +++++++++++--------
 1 file changed, 105 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/83fa24b6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index e8e5452..63e2a33 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.ByteArrayInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -25,7 +24,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -49,15 +50,13 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
-import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 import org.apache.nifi.processors.standard.util.jolt.TransformFactory;
-import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 
 import com.bazaarvoice.jolt.JoltTransform;
 import com.bazaarvoice.jolt.JsonUtils;
@@ -120,6 +119,16 @@ public class JoltTransformJSON extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("Transform Cache Size")
+            .description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("The FlowFile with transformed content will be routed to this relationship")
@@ -132,24 +141,34 @@ public class JoltTransformJSON extends AbstractProcessor {
     private final static List<PropertyDescriptor> properties;
     private final static Set<Relationship> relationships;
     private volatile ClassLoader customClassLoader;
-    private volatile JoltTransform transform;
-    private volatile String specJsonString;
     private final static String DEFAULT_CHARSET = "UTF-8";
 
-    static{
+    // Cache is guarded by synchronizing on 'this'.
+    private volatile int maxTransformsToCache = 10;
+    private final Map<String, JoltTransform> transformCache = new LinkedHashMap<String, JoltTransform>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> eldest) {
+            final boolean evict = size() > maxTransformsToCache;
+            if (evict) {
+                getLogger().debug("Removing Jolt Transform from cache because cache is full");
+            }
+            return evict;
+        }
+    };
 
+    static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
         _properties.add(JOLT_TRANSFORM);
         _properties.add(CUSTOM_CLASS);
         _properties.add(MODULES);
         _properties.add(JOLT_SPEC);
+        _properties.add(TRANSFORM_CACHE_SIZE);
         properties = Collections.unmodifiableList(_properties);
 
         final Set<Relationship> _relationships = new HashSet<>();
         _relationships.add(REL_SUCCESS);
         _relationships.add(REL_FAILURE);
         relationships = Collections.unmodifiableSet(_relationships);
-
     }
 
     @Override
@@ -163,6 +182,7 @@ public class JoltTransformJSON extends AbstractProcessor {
     }
 
 
+
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
@@ -177,34 +197,29 @@ public class JoltTransformJSON extends AbstractProcessor {
                         .explanation(message)
                         .build());
             }
-
         } else {
-
             final ClassLoader customClassLoader;
-            try {
 
-                if(modulePath != null) {
+            try {
+                if (modulePath != null) {
                     customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
-                }else{
+                } else {
                     customClassLoader =  this.getClass().getClassLoader();
                 }
 
                 final String specValue =  validationContext.getProperty(JOLT_SPEC).getValue();
                 final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
 
-                if(validationContext.isExpressionLanguagePresent(specValue) && invalidExpressionMsg != null){
+                if (validationContext.isExpressionLanguagePresent(specValue) && invalidExpressionMsg != null) {
                     final String customMessage = "The expression language used withing this specification is invalid";
                     results.add(new ValidationResult.Builder().valid(false)
                             .explanation(customMessage)
                             .build());
-
-                }else {
-
+                } else {
                     //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
                     Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{}","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
 
                     if (CUSTOMR.getValue().equals(transform)) {
-
                         if (StringUtils.isEmpty(customTransform)) {
                             final String customMessage = "A custom transformation class should be provided. ";
                             results.add(new ValidationResult.Builder().valid(false)
@@ -213,12 +228,10 @@ public class JoltTransformJSON extends AbstractProcessor {
                         } else {
                             TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
                         }
-
                     } else {
                         TransformFactory.getTransform(customClassLoader, transform, specJson);
                     }
                 }
-
             } catch (final Exception e) {
                 getLogger().info("Processor is not valid - " + e.toString());
                 String message = "Specification not valid for the selected transformation." ;
@@ -227,12 +240,12 @@ public class JoltTransformJSON extends AbstractProcessor {
                         .build());
             }
         }
+
         return results;
     }
 
     @Override
     public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
-
         final FlowFile original = session.get();
         if (original == null) {
             return;
@@ -241,62 +254,31 @@ public class JoltTransformJSON extends AbstractProcessor {
         final ComponentLog logger = getLogger();
         final StopWatch stopWatch = new StopWatch(true);
 
-        final byte[] originalContent = new byte[(int) original.getSize()];
-        session.read(original, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.fillBuffer(in, originalContent, true);
-            }
-        });
+        final Object inputJson;
+        try (final InputStream in = session.read(original)) {
+            inputJson = JsonUtils.jsonToObject(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", new Object[] {original, e});
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
 
         final String jsonString;
         final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
-
         try {
-
-            final String specString;
-
-            if(context.getProperty(JOLT_SPEC).isSet() && !StringUtils.isEmpty(context.getProperty(JOLT_SPEC).getValue())){
-                specString = context.isExpressionLanguagePresent(JOLT_SPEC) ? context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(original).getValue() :
-                             context.getProperty(JOLT_SPEC).getValue();
-            }else{
-                specString = null;
-            }
-
-            if(transform == null || (specString != null && !specJsonString.equals(specString)) || (specString == null && SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue()))){
-
-                specJsonString = specString;
-
-                final Object specJson;
-                if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
-                    specJson = JsonUtils.jsonToObject(specJsonString, DEFAULT_CHARSET);
-                }else{
-                    specJson = null;
-                }
-
-                if(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
-                    transform = TransformFactory.getCustomTransform(customClassLoader,context.getProperty(CUSTOM_CLASS).getValue(), specJson);
-                }else {
-                    transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
-                }
-            }
-
-            if(customClassLoader != null) {
+            final JoltTransform transform = getTransform(context, original);
+            if (customClassLoader != null) {
                 Thread.currentThread().setContextClassLoader(customClassLoader);
             }
 
-            final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent);
-            final Object inputJson = JsonUtils.jsonToObject(bais);
             final Object transformedJson = TransformUtils.transform(transform,inputJson);
             jsonString = JsonUtils.toJsonString(transformedJson);
-
-        } catch (Exception ex) {
-            logger.error("Unable to transform {} due to {}", new Object[]{original, ex});
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[] {original, ex.toString(), ex});
             session.transfer(original, REL_FAILURE);
             return;
-
-        }finally {
-            if(customClassLoader != null && originalContextClassLoader != null) {
+        } finally {
+            if (customClassLoader != null && originalContextClassLoader != null) {
                 Thread.currentThread().setContextClassLoader(originalContextClassLoader);
             }
         }
@@ -309,27 +291,73 @@ public class JoltTransformJSON extends AbstractProcessor {
         });
 
         final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
-        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(),"application/json");
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
         session.transfer(transformed, REL_SUCCESS);
         session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS));
         logger.info("Transformed {}", new Object[]{original});
+    }
 
+    private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
+        final String specString;
+        if (context.getProperty(JOLT_SPEC).isSet() && !StringUtils.isEmpty(context.getProperty(JOLT_SPEC).getValue())) {
+            specString = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            specString = null;
+        }
+
+        // Get the transform from our cache, if it exists.
+        JoltTransform transform = null;
+        synchronized (this) {
+            transform = transformCache.get(specString);
+        }
+
+        if (transform != null) {
+            return transform;
+        }
+
+        // If no transform for our spec, create the transform.
+        final Object specJson;
+        if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
+            specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
+        } else {
+            specJson = null;
+        }
+
+        if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
+            transform = TransformFactory.getCustomTransform(customClassLoader, context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+        } else {
+            transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
+        }
+
+        // Check again for the transform in our cache, since it's possible that another thread has
+        // already populated it. If absent from the cache, populate the cache. Otherwise, use the
+        // value from the cache.
+        synchronized (this) {
+            final JoltTransform existingTransform = transformCache.get(specString);
+            if (existingTransform == null) {
+                transformCache.put(specString, transform);
+            } else {
+                transform = existingTransform;
+            }
+        }
+
+        return transform;
     }
 
     @OnScheduled
-    public void setup(final ProcessContext context) {
+    public synchronized void setup(final ProcessContext context) {
+        transformCache.clear();
+        maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
 
-        try{
-            if(context.getProperty(MODULES).isSet()){
-                customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(),this.getClass().getClassLoader(),getJarFilenameFilter());
-            }else{
+        try {
+            if (context.getProperty(MODULES).isSet()) {
+                customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
+            } else {
                 customClassLoader = this.getClass().getClassLoader();
             }
-
-        } catch (Exception ex){
-            getLogger().error("Unable to setup processor",ex);
+        } catch (final Exception ex) {
+            getLogger().error("Unable to setup processor", ex);
         }
-
     }
 
     protected FilenameFilter getJarFilenameFilter(){