You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/17 13:39:44 UTC
[4/8] git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI
to make it easier to configure and allow 3rd party to plugin custom
strategies. Work in progress.
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f1f901a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f1f901a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f1f901a
Branch: refs/heads/master
Commit: 3f1f901a0cb6c52f8e7a10768fe1114f0a294b32
Parents: f0180ca
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 17 13:05:49 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 17 13:05:49 2013 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/CamelContext.java | 13 +++-
.../converter/stream/CachedOutputStream.java | 12 ++--
.../apache/camel/impl/DefaultCamelContext.java | 13 ++++
.../impl/DefaultStreamCachingStrategy.java | 68 ++++++++++++++++++++
.../apache/camel/spi/StreamCachingStrategy.java | 29 +++++++++
.../java/org/apache/camel/util/FileUtil.java | 33 ++++++----
6 files changed, 147 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 9e747c5..6db32ba 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -51,6 +51,7 @@ import org.apache.camel.spi.Registry;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
+import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.camel.spi.UuidGenerator;
import org.apache.camel.util.LoadPropertiesException;
@@ -1207,9 +1208,19 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
*/
Map<String, Properties> findComponents() throws LoadPropertiesException, IOException;
-
/**
* Returns the HTML documentation for the given camel component
*/
String getComponentDocumentation(String componentName) throws IOException;
+
+ /**
+ * Gets the {@link StreamCachingStrategy} to use.
+ */
+ StreamCachingStrategy getStreamCachingStrategy();
+
+ /**
+ * Sets a custom {@link StreamCachingStrategy} to use.
+ */
+ void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy);
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index a2cf0a1..f12a111 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -55,7 +55,7 @@ public class CachedOutputStream extends OutputStream {
public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
-
+
private OutputStream currentStream;
private boolean inMemory = true;
private int totalLength;
@@ -68,12 +68,12 @@ public class CachedOutputStream extends OutputStream {
private String cipherTransformation;
private CipherPair ciphers;
-
public CachedOutputStream(Exchange exchange) {
this(exchange, true);
}
public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
+ // TODO: these options should be on StreamCachingStrategy
String bufferSize = exchange.getContext().getProperty(BUFFER_SIZE);
String hold = exchange.getContext().getProperty(THRESHOLD);
String dir = exchange.getContext().getProperty(TEMP_DIR);
@@ -86,6 +86,8 @@ public class CachedOutputStream extends OutputStream {
}
if (dir != null) {
this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
+ } else {
+ this.outputDir = exchange.getContext().getStreamCachingStrategy().getTemporaryDirectory();
}
this.cipherTransformation = exchange.getContext().getProperty(CIPHER_TRANSFORMATION);
@@ -222,11 +224,7 @@ public class CachedOutputStream extends OutputStream {
flush();
ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
- if (outputDir == null) {
- tempFile = FileUtil.createTempFile("cos", ".tmp");
- } else {
- tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
- }
+ tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
LOG.trace("Creating temporary stream cache file: {}", tempFile);
http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 9063095..e4017be 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -119,6 +119,7 @@ import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
+import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.camel.spi.UuidGenerator;
import org.apache.camel.support.ServiceSupport;
@@ -195,6 +196,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
private FactoryFinderResolver factoryFinderResolver = new DefaultFactoryFinderResolver();
private FactoryFinder defaultFactoryFinder;
private PropertiesComponent propertiesComponent;
+ private StreamCachingStrategy streamCachingStrategy = new DefaultStreamCachingStrategy();
private final Map<String, FactoryFinder> factories = new HashMap<String, FactoryFinder>();
private final Map<String, RouteService> routeServices = new LinkedHashMap<String, RouteService>();
private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<String, RouteService>();
@@ -1679,6 +1681,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
threshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
}
log.info("Stream caching is enabled, and using {} kb as threshold for overflow and spooling to disk store.", threshold / 1024);
+
+ // stream caching is in use so enable the strategy
+ addService(streamCachingStrategy);
}
// start routes
@@ -2660,6 +2665,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
this.uuidGenerator = uuidGenerator;
}
+ public StreamCachingStrategy getStreamCachingStrategy() {
+ return streamCachingStrategy;
+ }
+
+ public void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy) {
+ this.streamCachingStrategy = streamCachingStrategy;
+ }
+
@Override
public String getProperty(String name) {
String value = getProperties().get(name);
http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
new file mode 100644
index 0000000..3333d77
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.io.File;
+
+import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements StreamCachingStrategy {
+
+ // TODO: Add options to configure more stuff like overflow size et all
+ // TODO: Add JMX management
+ // TODO: Maybe use #syntax# for default temp dir so ppl can easily configure this
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class);
+ private File temporaryDirectory;
+
+ public void setTemporaryDirectory(File path) {
+ this.temporaryDirectory = path;
+ }
+
+ public File getTemporaryDirectory() {
+ return temporaryDirectory;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // create random temporary directory if none has been created
+ if (temporaryDirectory == null) {
+ temporaryDirectory = FileUtil.createNewTempDir();
+ LOG.info("Created temporary directory {}", temporaryDirectory);
+ } else {
+ if (!temporaryDirectory.exists()) {
+ boolean created = temporaryDirectory.mkdirs();
+ if (!created) {
+ LOG.warn("Cannot create temporary directory {}", temporaryDirectory);
+ } else {
+ LOG.info("Created temporary directory {}", temporaryDirectory);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (temporaryDirectory != null) {
+ LOG.info("Removing temporary directory {}", temporaryDirectory);
+ FileUtil.removeDir(temporaryDirectory);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
new file mode 100644
index 0000000..cc5ad2e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.io.File;
+
+/**
+ * Strategy for using <a href="http://camel.apache.org/stream-caching.html">stream caching</a>.
+ */
+public interface StreamCachingStrategy {
+
+ void setTemporaryDirectory(File path);
+
+ File getTemporaryDirectory();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f1f901a/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/FileUtil.java b/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
index 339a5da..22b691d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
+++ b/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
@@ -285,6 +285,24 @@ public final class FileUtil {
return defaultTempDir;
}
+ defaultTempDir = createNewTempDir();
+
+ // create shutdown hook to remove the temp dir
+ shutdownHook = new Thread() {
+ @Override
+ public void run() {
+ removeDir(defaultTempDir);
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ return defaultTempDir;
+ }
+
+ /**
+ * Creates a new temporary directory in the <tt>java.io.tmpdir</tt> directory.
+ */
+ public static File createNewTempDir() {
String s = System.getProperty("java.io.tmpdir");
File checkExists = new File(s);
if (!checkExists.exists()) {
@@ -304,18 +322,7 @@ public final class FileUtil {
f = new File(s, "camel-tmp-" + x);
}
- defaultTempDir = f;
-
- // create shutdown hook to remove the temp dir
- shutdownHook = new Thread() {
- @Override
- public void run() {
- removeDir(defaultTempDir);
- }
- };
- Runtime.getRuntime().addShutdownHook(shutdownHook);
-
- return defaultTempDir;
+ return f;
}
/**
@@ -332,7 +339,7 @@ public final class FileUtil {
}
}
- private static void removeDir(File d) {
+ public static void removeDir(File d) {
String[] list = d.list();
if (list == null) {
list = new String[0];