You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/04/14 10:22:40 UTC

[camel-k-runtime] branch master updated: fix #44 : Support StreamCaching configuration trough a ContextCustomizer.

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new e1f3502  fix #44 : Support StreamCaching configuration trough a ContextCustomizer.
e1f3502 is described below

commit e1f3502d3946d9ac1e3cc7c06614ddeb3f8a2178
Author: Andrea Tarocchi <at...@redhat.com>
AuthorDate: Sat Apr 13 00:02:46 2019 +0200

    fix #44 : Support StreamCaching configuration trough a ContextCustomizer.
---
 .../cutomizer/StreamCachingContextCustomizer.java  | 124 +++++++++++++++++++++
 .../org/apache/camel/k/customizer/streamcaching    |  18 +++
 .../StreamCachingContextCustomizerTest.java        |  60 ++++++++++
 3 files changed, 202 insertions(+)

diff --git a/camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java b/camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java
new file mode 100644
index 0000000..dcfa099
--- /dev/null
+++ b/camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java
@@ -0,0 +1,124 @@
+package org.apache.camel.k.cutomizer;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.k.ContextCustomizer;
+import org.apache.camel.k.Runtime;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamCachingContextCustomizer implements ContextCustomizer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamCachingContextCustomizer.class);
+
+    private boolean enabled;
+    private boolean anySpoolRules;
+    private int bufferSize;
+    private boolean removeSpoolDirectoryWhenStopping;
+    private String spoolChiper;
+    private String spoolDirectory;
+    private long spoolThreshold;
+    private String spoolUsedHeapMemoryLimit;
+    private int spoolUsedHeapMemoryThreshold;
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public String getSpoolChiper() {
+        return spoolChiper;
+    }
+
+    public void setSpoolChiper(String spoolChiper) {
+        this.spoolChiper = spoolChiper;
+    }
+
+    public String getSpoolDirectory() {
+        return spoolDirectory;
+    }
+
+    public void setSpoolDirectory(String spoolDirectory) {
+        this.spoolDirectory = spoolDirectory;
+    }
+
+    public long getSpoolThreshold() {
+        return spoolThreshold;
+    }
+
+    public void setSpoolThreshold(long spoolThreshold) {
+        this.spoolThreshold = spoolThreshold;
+    }
+
+    public String getSpoolUsedHeapMemoryLimit() {
+        return spoolUsedHeapMemoryLimit;
+    }
+
+    public void setSpoolUsedHeapMemoryLimit(String spoolUsedHeapMemoryLimit) {
+        this.spoolUsedHeapMemoryLimit = spoolUsedHeapMemoryLimit;
+    }
+
+    public int getSpoolUsedHeapMemoryThreshold() {
+        return spoolUsedHeapMemoryThreshold;
+    }
+
+    public void setSpoolUsedHeapMemoryThreshold(int spoolUsedHeapMemoryThreshold) {
+        this.spoolUsedHeapMemoryThreshold = spoolUsedHeapMemoryThreshold;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    public boolean isAnySpoolRules() {
+        return anySpoolRules;
+    }
+
+    public void setAnySpoolRules(boolean anySpoolRules) {
+        this.anySpoolRules = anySpoolRules;
+    }
+
+    public boolean isRemoveSpoolDirectoryWhenStopping() {
+        return removeSpoolDirectoryWhenStopping;
+    }
+
+    public void setRemoveSpoolDirectoryWhenStopping(boolean removeSpoolDirectoryWhenStopping) {
+        this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping;
+    }
+
+    @Override
+    public void apply(CamelContext camelContext, Runtime.Registry runtimeRegistry) {
+        camelContext.setStreamCaching(isEnabled());
+        camelContext.getStreamCachingStrategy().setAnySpoolRules(isAnySpoolRules());
+        camelContext.getStreamCachingStrategy().setBufferSize(getBufferSize());
+        camelContext.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(isRemoveSpoolDirectoryWhenStopping());
+        camelContext.getStreamCachingStrategy().setSpoolChiper(getSpoolChiper());
+        if (getSpoolDirectory() != null) {
+            camelContext.getStreamCachingStrategy().setSpoolDirectory(getSpoolDirectory());
+        }
+        if (getSpoolThreshold() != 0) {
+            camelContext.getStreamCachingStrategy().setSpoolThreshold(getSpoolThreshold());
+        }
+        if (getSpoolUsedHeapMemoryLimit() != null) {
+            StreamCachingStrategy.SpoolUsedHeapMemoryLimit limit;
+            if ("Committed".equalsIgnoreCase(getSpoolUsedHeapMemoryLimit())) {
+                limit = StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Committed;
+            } else if ("Max".equalsIgnoreCase(getSpoolUsedHeapMemoryLimit())) {
+                limit = StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Max;
+            } else {
+                throw new IllegalArgumentException("Invalid option " + getSpoolUsedHeapMemoryLimit() + " must either be Committed or Max");
+            }
+            camelContext.getStreamCachingStrategy().setSpoolUsedHeapMemoryLimit(limit);
+        }
+        if (getSpoolUsedHeapMemoryThreshold() != 0) {
+            camelContext.getStreamCachingStrategy().setSpoolUsedHeapMemoryThreshold(getSpoolUsedHeapMemoryThreshold());
+        }
+        LOGGER.info("Configured camel context through CamelContextCustomizer.class");
+    }
+}
diff --git a/camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching b/camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching
new file mode 100644
index 0000000..58efa10
--- /dev/null
+++ b/camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.k.cutomizer.StreamCachingContextCustomizer
diff --git a/camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java b/camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java
new file mode 100644
index 0000000..c3aab7f
--- /dev/null
+++ b/camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java
@@ -0,0 +1,60 @@
+package org.apache.camel.k.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.k.InMemoryRegistry;
+import org.apache.camel.k.cutomizer.StreamCachingContextCustomizer;
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class StreamCachingContextCustomizerTest {
+
+    @Test
+    public void testClasspathHandler() {
+        StreamCachingContextCustomizer scccc = new StreamCachingContextCustomizer();
+
+        scccc.setAnySpoolRules(true);
+        scccc.setBufferSize(9);
+        scccc.setEnabled(true);
+        scccc.setRemoveSpoolDirectoryWhenStopping(true);
+        scccc.setSpoolChiper("sha");
+        scccc.setSpoolDirectory("./xxx");
+        scccc.setSpoolThreshold(9);
+        scccc.setSpoolUsedHeapMemoryLimit("Committed");
+        scccc.setSpoolUsedHeapMemoryThreshold(9);
+
+        CamelContext context = new DefaultCamelContext();
+        scccc.apply(context, new InMemoryRegistry());
+
+        assertThat(context.getStreamCachingStrategy().isAnySpoolRules()).isTrue();
+        assertThat(context.getStreamCachingStrategy().getBufferSize()).isEqualTo(9);
+        assertThat(context.isStreamCaching()).isTrue();
+        assertThat(context.getStreamCachingStrategy().isRemoveSpoolDirectoryWhenStopping()).isTrue();
+        assertThat(context.getStreamCachingStrategy().getSpoolChiper()).isEqualTo("sha");
+        assertThat(context.getStreamCachingStrategy().getSpoolDirectory()).isNull();
+        assertThat(context.getStreamCachingStrategy().getSpoolThreshold()).isEqualTo(9L);
+        assertThat(context.getStreamCachingStrategy().getSpoolUsedHeapMemoryLimit()).isEqualTo(StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Committed);
+        assertThat(context.getStreamCachingStrategy().getSpoolUsedHeapMemoryThreshold()).isEqualTo(9);
+
+        scccc.setSpoolUsedHeapMemoryLimit("Max");
+
+        scccc.apply(context, new InMemoryRegistry());
+        assertThat(context.getStreamCachingStrategy().getSpoolUsedHeapMemoryLimit()).isEqualTo(StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Max);
+    }
+
+    @Test
+    public void testUnsupportedStreamCachingSpoolUsedHeapMemoryLimit() {
+        StreamCachingContextCustomizer scccc = new StreamCachingContextCustomizer();
+
+        scccc.setSpoolUsedHeapMemoryLimit("Unsupported");
+
+        CamelContext context = new DefaultCamelContext();
+
+        IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> scccc.apply(context, new InMemoryRegistry()));
+
+        assertThat(exception.getMessage()).isEqualTo("Invalid option Unsupported must either be Committed or Max");
+    }
+}