You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:32 UTC
[31/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java b/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java
deleted file mode 100644
index 8afc6cc..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import java.io.File;
-import java.nio.file.WatchService;
-import java.util.Comparator;
-
-import edgent.connectors.file.runtime.DirectoryWatcher;
-import edgent.connectors.file.runtime.IFileWriterPolicy;
-import edgent.connectors.file.runtime.TextFileReader;
-import edgent.connectors.file.runtime.TextFileWriter;
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.function.Supplier;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.TopologyElement;
-
-/**
- * {@code FileStreams} is a connector for integrating with file system objects.
- * <p>
- * File stream operations include:
- * <ul>
- * <li>Write tuples to text files - {@link #textFileWriter(TStream, Supplier, Supplier) textFileWriter}</li>
- * <li>Watch a directory for new files - {@link #directoryWatcher(TopologyElement, Supplier) directoryWatcher}</li>
- * <li>Create tuples from text files - {@link #textFileReader(TStream, Function, BiFunction) textFileReader}</li>
- * </ul>
- */
-public class FileStreams {
- @SuppressWarnings("unused")
- private static final FileStreams forCodeCoverage = new FileStreams();
- private FileStreams() {};
-
- /**
- * Declare a stream containing the absolute pathname of
- * newly created file names from watching {@code directory}.
- * <p>
- * This is the same as {@code directoryWatcher(t, () -> dir, null)}.
- *
- * @param te topology element whose topology the watcher will be added to
- * @param directory
- * Name of the directory to watch.
- * @return Stream containing absolute pathnames of newly created files in
- * {@code directory}.
- */
- public static TStream<String> directoryWatcher(TopologyElement te,
- Supplier<String> directory) {
- return directoryWatcher(te, directory, null);
- }
-
- /**
- * Declare a stream containing the absolute pathname of
- * newly created file names from watching {@code directory}.
- * <p>
- * Hidden files (java.io.File.isHidden()==true) are ignored.
- * This is compatible with {@code textFileWriter}.
- * <p>
- * Sample use:
- * <pre>{@code
- * String dir = "/some/directory/path";
- * Topology t = ...
- * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir, null);
- * }</pre>
- * <p>
- * The order of the files in the stream is dictated by a {@link Comparator}.
- * The default comparator orders files by {@link File#lastModified()} values.
- * There are no guarantees on the processing order of files that
- * have the same lastModified value.
- * Note, lastModified values are subject to filesystem timestamp
- * quantization - e.g., 1second.
- * <p>
- * Note: due to the asynchronous nature of things, if files in the
- * directory may be removed, the receiver of a tuple with a "new" file
- * pathname may need to be prepared for the pathname to no longer be
- * valid when it receives the tuple or during its processing of the tuple.
- * <p>
- * The behavior on MacOS may be unsavory, even as recent as Java8, as
- * MacOs Java lacks a native implementation of {@link WatchService}.
- * The result can be a delay in detecting newly created files (e.g., 10sec)
- * as well not detecting rapid deletion and recreation of a file.
- *
- * @param te topology element whose topology the watcher will be added to
- * @param directory
- * Name of the directory to watch.
- * @param comparator
- * Comparator to use to order newly seen file pathnames.
- * May be null.
- * @return Stream containing absolute pathnames of newly created files in
- * {@code directory}.
- */
- public static TStream<String> directoryWatcher(TopologyElement te,
- Supplier<String> directory, Comparator<File> comparator) {
- return te.topology().source(() -> new DirectoryWatcher(directory, comparator));
- }
-
- /**
- * Declare a stream containing the lines read from the files
- * whose pathnames correspond to each tuple on the {@code pathnames}
- * stream.
- * <p>
- * This is the same as {@code textFileReader(pathnames, null, null)}
- * <p>
- * Sample use:
- * <pre>{@code
- * String dir = "/some/directory/path";
- * Topology t = ...
- * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir);
- * TStream<String> contents = FileStreams.textFileReader(pathnames);
- * contents.print();
- * }</pre>
- *
- * @param pathnames
- * Stream containing pathnames of files to read.
- * @return Stream containing lines from the files.
- */
- public static TStream<String> textFileReader(TStream<String> pathnames) {
- return textFileReader(pathnames, null, null);
- }
-
- /**
- * Declare a stream containing the lines read from the files
- * whose pathnames correspond to each tuple on the {@code pathnames}
- * stream.
- * <p>
- * All files are assumed to be encoded in UTF-8. The lines are
- * output in the order they appear in each file, with the first line of
- * a file appearing first. A file is not subsequently monitored for
- * additional lines.
- * <p>
- * If a file can not be read, e.g., a file doesn't exist at that pathname
- * or the pathname is for a directory,
- * an error will be logged.
- * <p>
- * Optional {@code preFn} and {@code postFn} functions may be supplied.
- * These are called prior to processing a tuple (pathname) and after
- * respectively. They provide a way to encode markers in the generated
- * stream.
- * <p>
- * Sample use:
- * <pre>{@code
- * // watch a directory for files, creating a stream with the contents of
- * // each file. Use a preFn to include a file separator marker in the
- * // stream. Use a postFn to delete a file once it's been processed.
- * String dir = "/some/directory/path";
- * Topology t = ...
- * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir);
- * TStream<String> contents = FileStreams.textFileReader(
- * pathnames,
- * path -> { return "###<PATH-MARKER>### " + path },
- * (path,exception) -> { new File(path).delete(), return null; }
- * );
- * contents.print();
- * }</pre>
- *
- * @param pathnames
- * Stream containing pathnames of files to read.
- * @param preFn
- * Pre-visit {@code Function<String,String>}.
- * The input is the pathname.
- * The result, when non-null, is added to the output stream.
- * The function may be null.
- * @param postFn
- * Post-visit {@code BiFunction<String,Exception,String>}.
- * The input is the pathname and an exception. The exception
- * is null if there were no errors.
- * The result, when non-null, is added to the output stream.
- * The function may be null.
- * @return Stream containing lines from the files.
- */
- public static TStream<String> textFileReader(TStream<String> pathnames,
- Function<String,String> preFn, BiFunction<String,Exception,String> postFn) {
-
- TextFileReader reader = new TextFileReader();
- reader.setPre(preFn);
- reader.setPost(postFn);
- return pathnames.pipe(reader);
- }
-
- /**
- * Write the contents of a stream to files.
- * <p>
- * The default {@link FileWriterPolicy} is used.
- * <p>
- * This is the same as {@code textFileWriter(contents, basePathname, null)}.
- * <p>
- * Sample use:
- * <pre>{@code
- * // write a stream of LogEvent to files, using the default
- * // file writer policy
- * String basePathname = "/myLogDir/LOG"; // yield LOG_YYYYMMDD_HHMMSS
- * TStream<MyLogEvent> events = ...
- * TStream<String> stringEvents = events.map(event -> event.toString());
- * FileStreams.textFileWriter(stringEvents, () -> basePathname);
- * }</pre>
- * @param contents the lines to write
- * @param basePathname the base pathname of the created files
- * @return a TSink
- */
- public static TSink<String> textFileWriter(TStream<String> contents,
- Supplier<String> basePathname) {
- return textFileWriter(contents, basePathname, null);
- }
-
- /**
- * Write the contents of a stream to files subject to the control
- * of a file writer policy.
- * <p>
- * A separate policy instance must be used for invocation.
- * A default {@link FileWriterPolicy} is used if a policy is not specified.
- * <p>
- * Sample use:
- * <pre>{@code
- * // write a stream of LogEvent to files using a policy of:
- * // no additional flush, 100 events per file, retain 5 files
- * IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- * FileWriterFlushConfig.newImplicitConfig(),
- * FileWriterCycleConfig.newCountBasedConfig(100),
- * FileWriterRetentionConfig.newFileCountBasedConfig(5)
- * );
- * String basePathname = "/myLogDir/LOG"; // yield LOG_YYYYMMDD_HHMMSS
- * TStream<MyLogEvent> events = ...
- * TStream<String> stringEvents = events.map(event -> event.toString());
- * FileStreams.textFileWriter(stringEvents, () -> basePathname, () -> policy);
- * }</pre>
- * @param contents the lines to write
- * @param basePathname the base pathname of the created files
- * @param policy the policy to use. may be null.
- * @return a TSink
- * @see FileWriterPolicy
- */
- public static TSink<String> textFileWriter(TStream<String> contents,
- Supplier<String> basePathname, Supplier<IFileWriterPolicy<String>> policy) {
- if (policy == null) {
- IFileWriterPolicy<String> defaultPolicy = new FileWriterPolicy<String>(){};
- policy = () -> defaultPolicy;
- }
- return contents.sink(new TextFileWriter(basePathname, policy));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java
deleted file mode 100644
index 7053f37..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import edgent.function.Predicate;
-
-/**
- * FileWriter active file cycle configuration control.
- * <p>
- * Cycling the active file closes it, gets it to its final pathname,
- * and induces the application of a retention policy
- * {@link FileWriterRetentionConfig}.
- * <p>
- * Cycling the active file can be any combination of:
- * <ul>
- * <li>after {@code fileSize} bytes have been written</li>
- * <li>after every {@code cntTuple} tuples written</li>
- * <li>after {@code tuplePredicate} returns true</li>
- * <li>after {@code periodMsec} has elapsed since the last time based cycle</li>
- * </ul>
- *
- * @param <T> stream tuple type
- */
-public class FileWriterCycleConfig<T> {
- private long fileSize;
- private int cntTuples;
- private long periodMsec;
- private Predicate<T> tuplePredicate;
-
- /** same as {@code newConfig(fileSize, 0, 0, null)}
- *
- * @param <T> Tuple type
- * @param fileSize cycle after {@code fileSize} bytes have been written. 0 to disable.
- * @return the cycle configuration
- */
- public static <T> FileWriterCycleConfig<T> newFileSizeBasedConfig(long fileSize) {
- if (fileSize < 1)
- throw new IllegalArgumentException("fileSize");
- return newConfig(fileSize, 0, 0, null);
- }
- /** same as {@code newConfig(0, cntTuples, 0, null)}
- *
- * @param <T> Tuple type
- * @param cntTuples cycle after every {@code cntTuple} tuples have been written. 0 to disable.
- * @return the cycle configuration
- */
- public static <T> FileWriterCycleConfig<T> newCountBasedConfig(int cntTuples) {
- if (cntTuples < 1)
- throw new IllegalArgumentException("cntTuples");
- return newConfig(0, cntTuples, 0, null);
- }
- /** same as {@code newConfig(0, 0, periodMsec, null)}
- *
- * @param <T> Tuple type
- * @param periodMsec cycle after {@code periodMsec} has elapsed since the last time based cycle. 0 to disable.
- * @return the cycle configuration
- */
- public static <T> FileWriterCycleConfig<T> newTimeBasedConfig(long periodMsec) {
- if (periodMsec < 1)
- throw new IllegalArgumentException("periodMsec");
- return newConfig(0, 0, periodMsec, null);
- }
- /** same as {@code newConfig(0, 0, 0, tuplePredicate)}
- *
- * @param <T> Tuple type
- * @param tuplePredicate cycle if {@code tuplePredicate} returns true. null to disable.
- * @return the cycle configuration
- */
- public static <T> FileWriterCycleConfig<T> newPredicateBasedConfig(Predicate<T> tuplePredicate) {
- return newConfig(0, 0, 0, tuplePredicate);
- }
-
- /**
- * Create a new configuration.
- * <p>
- * At least one configuration mode must be enabled.
- * @param <T> Tuple type
- * @param fileSize cycle after {@code fileSize} bytes have been written. 0 to disable.
- * @param cntTuples cycle after every {@code cntTuple} tuples have been written. 0 to disable.
- * @param periodMsec cycle after {@code periodMsec} has elapsed since the last time based cycle. 0 to disable.
- * @param tuplePredicate cycle if {@code tuplePredicate} returns true. null to disable.
- * @return the cycle configuration
- */
- public static <T> FileWriterCycleConfig<T> newConfig(long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
- return new FileWriterCycleConfig<>(fileSize, cntTuples, periodMsec, tuplePredicate);
- }
-
- private FileWriterCycleConfig(long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
- if (fileSize < 0)
- throw new IllegalArgumentException("fileSize");
- if (cntTuples < 0)
- throw new IllegalArgumentException("cntTuples");
- if (periodMsec < 0)
- throw new IllegalArgumentException("periodMsec");
- if (fileSize==0 && cntTuples==0 && periodMsec==0 && tuplePredicate==null)
- throw new IllegalArgumentException("no cycle configuration specified");
-
- this.fileSize = fileSize;
- this.cntTuples = cntTuples;
- this.periodMsec = periodMsec;
- this.tuplePredicate = tuplePredicate;
- }
-
- /**
- * Get the file size configuration value.
- * @return the value
- */
- public long getFileSize() { return fileSize; }
-
- /**
- * Get the tuple count configuration value.
- * @return the value
- */
- public int getCntTuples() { return cntTuples; }
-
- /**
- * Get the time period configuration value.
- * @return the value
- */
- public long getPeriodMsec() { return periodMsec; }
-
- /**
- * Get the tuple predicate configuration value.
- * @return the value
- */
- public Predicate<T> getTuplePredicate() { return tuplePredicate; }
-
- /**
- * Evaluate if the specified values indicate that a cycling of
- * the active file should be performed.
- * @param fileSize the number of bytes written to the active file
- * @param nTuples number of tuples written to the active file
- * @param tuple the tuple written to the file
- * @return true if a cycle action should be performed.
- */
- public boolean evaluate(long fileSize, int nTuples, T tuple) {
- return (this.fileSize > 0 && fileSize > this.fileSize)
- || (cntTuples > 0 && nTuples > 0 && nTuples % cntTuples == 0)
- || (tuplePredicate != null && tuplePredicate.test(tuple));
- }
-
- @Override
- public String toString() {
- return String.format("fileSize:%d cntTuples:%d periodMsec:%d tuplePredicate:%s",
- getFileSize(), getCntTuples(), getPeriodMsec(),
- getTuplePredicate() == null ? "no" : "yes");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java
deleted file mode 100644
index 55d691b..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import edgent.function.Predicate;
-
-/**
- * FileWriter active file flush configuration control.
- * <p>
- * Flushing of the active file can be any combination of:
- * <ul>
- * <li>after every {@code cntTuple} tuples written</li>
- * <li>after {@code tuplePredicate} returns true</li>
- * <li>after {@code periodMsec} has elapsed since the last time based flush</li>
- * </ul>
- * If nothing specific is specified, the underlying buffered
- * writer's automatic flushing is utilized.
- *
- * @param <T> stream tuple type
- */
-public class FileWriterFlushConfig<T> {
- private int cntTuples;
- private long periodMsec;
- private Predicate<T> tuplePredicate;
-
- /**
- * Create a new configuration.
- * <p>
- * The underlying buffered writer's automatic flushing is used.
- * <p>
- * Same as {@code newConfig(0, 0, null)}
- *
- * @param <T> Tuple type
- * @return the flush configuration
- */
- public static <T> FileWriterFlushConfig<T> newImplicitConfig() {
- return newConfig(0,0,null);
- }
- /** same as {@code newConfig(cntTuples, 0, null)}
- *
- * @param <T> Tuple type
- * @param cntTuples flush every {@code cntTuple} tuples written. 0 to disable.
- * @return the flush configuration
- */
- public static <T> FileWriterFlushConfig<T> newCountBasedConfig(int cntTuples) {
- if (cntTuples < 1)
- throw new IllegalArgumentException("cntTuples");
- return newConfig(cntTuples, 0, null);
- }
- /** same as {@code newConfig(0, periodMsec, null)}
- *
- * @param <T> Tuple type
- * @param periodMsec flush every {@code periodMsec} milliseconds. 0 to disable.
- * @return the flush configuration
- */
- public static <T> FileWriterFlushConfig<T> newTimeBasedConfig(long periodMsec) {
- if (periodMsec < 1)
- throw new IllegalArgumentException("periodMsec");
- return newConfig(0, periodMsec, null);
- }
- /** same as {@code newConfig(0, 0, tuplePredicate)}
- *
- * @param <T> Tuple type
- * @param tuplePredicate flush if {@code tuplePredicate} is true. null to disable.
- * @return the flush configuration
- */
- public static <T> FileWriterFlushConfig<T> newPredicateBasedConfig(Predicate<T> tuplePredicate) {
- if (tuplePredicate == null)
- throw new IllegalArgumentException("tuplePredicate");
- return newConfig(0, 0, tuplePredicate);
- }
- /**
- * Create a new configuration.
- * <p>
- * If nothing specific is specified, the underlying buffered
- * writer's automatic flushing is utilized.
- *
- * @param <T> Tuple type
- * @param cntTuples flush every {@code cntTuple} tuples written. 0 to disable.
- * @param periodMsec flush every {@code periodMsec} milliseconds. 0 to disable.
- * @param tuplePredicate flush if {@code tuplePredicate} is true. null to disable.
- * @return the flush configuration
- */
- public static <T> FileWriterFlushConfig<T> newConfig(int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
- return new FileWriterFlushConfig<>(cntTuples, periodMsec, tuplePredicate);
- }
-
- private FileWriterFlushConfig(int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
- if (cntTuples < 0)
- throw new IllegalArgumentException("cntTuples");
- if (periodMsec < 0)
- throw new IllegalArgumentException("periodMsec");
- this.cntTuples = cntTuples;
- this.periodMsec = periodMsec;
- this.tuplePredicate = tuplePredicate;
- }
-
- /**
- * Get the tuple count configuration value.
- * @return the value
- */
- public int getCntTuples() { return cntTuples; }
-
- /**
- * Get the time period configuration value.
- * @return the value
- */
- public long getPeriodMsec() { return periodMsec; }
-
- /**
- * Get the tuple predicate configuration value.
- * @return the value
- */
- public Predicate<T> getTuplePredicate() { return tuplePredicate; }
-
- /**
- * Evaluate if the specified values indicate that a flush should be
- * performed.
- * @param nTuples number of tuples written to the active file
- * @param tuple the tuple written to the file
- * @return true if a flush should be performed.
- */
- public boolean evaluate(int nTuples, T tuple) {
- return (cntTuples > 0 && nTuples > 0 && nTuples % cntTuples == 0)
- || (tuplePredicate != null && tuplePredicate.test(tuple));
- }
-
- @Override
- public String toString() {
- return String.format("cntTuples:%d periodMsec:%d tuplePredicate:%s",
- getCntTuples(), getPeriodMsec(),
- getTuplePredicate() == null ? "no" : "yes");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java
deleted file mode 100644
index c7849c4..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.Flushable;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-import edgent.connectors.file.runtime.FileConnector;
-import edgent.connectors.file.runtime.IFileWriterPolicy;
-
-/**
- * A full featured {@link IFileWriterPolicy} implementation.
- * <p>
- * The policy implements strategies for:
- * <ul>
- * <li>Active and final file pathname control.</li>
- * <li>Active file flush control (via @{link FileWriterFlushControl})</li>
- * <li>Active file cycle control (when to close/finalize the current active file;
- * via @{link FileWriterCycleControl})</li>
- * <li>file retention control (via @{link FileWriterRetentionControl})</li>
- * </ul>
- * The policy is very configurable. If additional flexibility is required
- * the class can be extended and documented "hook" methods overridden,
- * or an alternative full implementation of {@code FileWriterPolicy} can be
- * created.
- * <p>
- * Sample use:
- * <pre>
- * FileWriterPolicy<String> policy = new FileWriterPolicy(
- * FileWriterFlushConfig.newImplicitConfig(),
- * FileWriterCycleConfig.newCountBasedConfig(1000),
- * FileWriterRetentionConfig.newCountBasedConfig(10));
- * String basePathname = "/some/directory/and_base_name";
- *
- * TStream<String> streamToWrite = ...
- * FileStreams.textFileWriter(streamToWrite, () -> basePathname, () -> policy)
- * </pre>
- *
- * @param <T> stream tuple type
- * @see FileWriterFlushConfig
- * @see FileWriterCycleConfig
- * @see FileWriterRetentionConfig
- */
-public class FileWriterPolicy<T> implements IFileWriterPolicy<T> {
- private static final Logger trace = FileConnector.getTrace();
- private final FileWriterFlushConfig<T> flushConfig;
- private final FileWriterCycleConfig<T> cycleConfig;
- private final FileWriterRetentionConfig retentionConfig;
- private String basePathname;
- private Path parent;
- private String baseLeafname;
- private Flushable flushable;
- private Closeable closeable;
- private volatile int curTupleCnt;
- private volatile long curSize;
- private volatile boolean flushIt;
- private volatile boolean cycleIt;
- private volatile String lastYmdhms;
- private volatile int lastMinorSuffix;
- private final List<Path> retainedPaths = new ArrayList<>(); // oldest first
- private volatile ScheduledExecutorService executor;
-
- /**
- * Create a new file writer policy instance.
- * <p>
- * The configuration is:
- * <ul>
- * <li>10 second time based active file flushing</li>
- * <li>1MB file size based active file cycling</li>
- * <li>10 file retention count</li>
- * </ul>
- * The active and final file pathname behavior is specified in
- * {@link #FileWriterPolicy(FileWriterFlushConfig, FileWriterCycleConfig, FileWriterRetentionConfig)}
- */
- public FileWriterPolicy() {
- this(FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.SECONDS.toMillis(10)),
- FileWriterCycleConfig.newFileSizeBasedConfig(1*1024*1024),
- FileWriterRetentionConfig.newFileCountBasedConfig(10));
- }
-
- /**
- * Create a new file writer policy instance.
- * <p>
- * {@code flushConfig}, {@code cycleConfig} and {@code retentionConfig}
- * specify the configuration of the various controls.
- * <p>
- * The active file and final file pathnames are based
- * on the {@code basePathname} received in
- * {@link #initialize(String, Flushable, Closeable)}.
- * <p>
- * Where {@code parent} and {@code baseLeafname} are the
- * parent path and file name respectively of {@code basePathname}:
- * <ul>
- * <li>the active file is {@code parent/.baseLeafname}"</li>
- * <li>final file names are {@code parent/baseLeafname_YYYYMMDD_HHMMSS[_<n>]}
- * where the optional {@code _<n>} suffix is only present if needed
- * to distinguish a file from the previously finalized file.
- * {@code <n>} starts at 1 and is monotonically incremented.
- * </li>
- * </ul>
- * @param flushConfig active file flush control configuration
- * @param cycleConfig active file cycle control configuration
- * @param retentionConfig final file retention control configuration
- */
- public FileWriterPolicy(FileWriterFlushConfig<T> flushConfig,
- FileWriterCycleConfig<T> cycleConfig,
- FileWriterRetentionConfig retentionConfig) {
- this.flushConfig = flushConfig;
- this.cycleConfig = cycleConfig;
- this.retentionConfig = retentionConfig;
- }
-
- @Override
- public void close() {
- if (executor != null) {
- executor.shutdownNow();
- }
- }
-
- /**
- * Get the policy's active file flush configuration
- * @return the flush configuration
- */
- public FileWriterFlushConfig<T> getFlushConfig() {
- return flushConfig;
- }
-
- /**
- * Get the policy's active file cycle configuration
- * @return the cycle configuration
- */
- public FileWriterCycleConfig<T> getCycleConfig() {
- return cycleConfig;
- }
-
- /**
- * Get the policy's retention configuration
- * @return the retention configuration
- */
- public FileWriterRetentionConfig getRetentionConfig() {
- return retentionConfig;
- }
-
- @Override
- public void initialize(String basePathname, Flushable flushable,
- Closeable closeable) {
- this.basePathname = basePathname;
- this.flushable = flushable;
- this.closeable = closeable;
- Path basePath = new File(basePathname).toPath();
- this.parent = basePath.getParent();
- this.baseLeafname = basePath.getFileName().toString();
-
- if (flushConfig.getPeriodMsec() > 0) {
- long periodMsec = flushConfig.getPeriodMsec();
- getExecutor().scheduleAtFixedRate(
- () -> { try { this.flushable.flush(); }
- catch (IOException e) { /*ignore*/ }
- },
- periodMsec, periodMsec, TimeUnit.MILLISECONDS);
- }
- if (cycleConfig.getPeriodMsec() > 0) {
- long periodMsec = cycleConfig.getPeriodMsec();
- getExecutor().scheduleAtFixedRate(
- () -> { try { this.closeable.close(); }
- catch (IOException e) { /*ignore*/ }
- },
- periodMsec, periodMsec, TimeUnit.MILLISECONDS);
- }
- if (retentionConfig.getAgeSec() > 0) {
- long periodMsec = retentionConfig.getPeriodMsec();
- getExecutor().scheduleAtFixedRate(
- () -> applyTimeBasedRetention(),
- periodMsec, periodMsec, TimeUnit.MILLISECONDS);
- }
- }
-
- private ScheduledExecutorService getExecutor() {
- if (executor == null) {
- executor = Executors.newSingleThreadScheduledExecutor();
- }
- return executor;
- }
-
- @Override
- public void wrote(T tuple, long nbytes) {
- curSize += nbytes;
- curTupleCnt++;
- flushIt = flushConfig.evaluate(curTupleCnt, tuple);
- cycleIt = cycleConfig.evaluate(curSize, curTupleCnt, tuple);
- }
-
- @Override
- public boolean shouldFlush() {
- boolean b = flushIt;
- flushIt = false;
- return b;
- }
-
- @Override
- public boolean shouldCycle() {
- boolean b = cycleIt;
- cycleIt = false;
- return b;
- }
-
- @Override
- public Path getNextActiveFilePath() {
- Path path = hookGenerateNextActiveFilePath();
- trace.trace("next active file path={}", path);
- return path;
- }
-
- @Override
- public synchronized Path closeActiveFile(Path path) throws IOException {
- int tmpCurTupleCnt = curTupleCnt;
- resetActiveFileInfo();
- Path finalPath = hookGenerateFinalFilePath(path);
- trace.trace("closing active file nTuples={}, finalPath={}", tmpCurTupleCnt, finalPath);
- hookRenameFile(path, finalPath);
- retainedPaths.add(finalPath);
- applyRetention();
- return finalPath;
- }
-
- private void resetActiveFileInfo() {
- curSize = 0;
- curTupleCnt = 0;
- flushIt = false;
- cycleIt = false;
- }
-
- private synchronized void applyRetention() {
- long aggregateFileSize = 0; // compute when enabled
- if (retentionConfig.getAggregateFileSize() > 0) {
- for (Path path : retainedPaths) {
- File file = path.toFile();
- aggregateFileSize += file.length(); // 0 if doesn't exist
- }
- }
-
- if (retentionConfig.evaluate(retainedPaths.size(), aggregateFileSize)) {
- Path oldestPath = retainedPaths.remove(0);
- File file = oldestPath.toFile();
- trace.info("deleting file {}", file);
- file.delete();
- }
- }
-
- private synchronized void applyTimeBasedRetention() {
- long now = System.currentTimeMillis();
- long minTime = now - TimeUnit.SECONDS.toMillis(retentionConfig.getAgeSec());
- ArrayList<Path> toDelete = new ArrayList<>();
- for (Path path : retainedPaths) { // oldest first
- File file = path.toFile();
- if (file.lastModified() < minTime)
- toDelete.add(path);
- else
- break;
- }
- for (Path path : toDelete) {
- trace.info("deleting file {}", path);
- path.toFile().delete();
- }
- retainedPaths.removeAll(toDelete);
- }
-
- private String ymdhms() {
- return new SimpleDateFormat("YYYYMMdd_HHmmss").format(new Date());
- }
-
- /**
- * Generate the final file path for the active file.
- * <p>
- * The default implementation yields:
- * <br>
- * final file names are {@code basePathname_YYYYMMDD_HHMMSS[_<n>]}
- * where the optional {@code _<n>} suffix is only present if needed
- * to distinguish a file from the previously finalized file.
- * {@code <n>} starts at 1 and is monitonically incremented.
- * <p>
- * This hook method can be overridden.
- * <p>
- * Note, the implementation must handle the unlikely, but happens
- * in tests, case where files are cycling very fast (multiple per sec)
- * and the retention config tosses some within that same second.
- * I.e., avoid generating final path sequences like:
- * <pre>
- * leaf_YYYYMMDD_103099
- * leaf_YYYYMMDD_103099_1
- * leaf_YYYYMMDD_103099_2
- * delete leaf_YYYYMMDD_103099 -- retention cnt was 2
- * leaf_YYYYMMDD_103099 // should be _3
- * </pre>
- *
- * @param path the active file path to finalize
- * @return final path for the file
- */
- protected Path hookGenerateFinalFilePath(Path path) {
- String ymdhms = ymdhms();
- if (ymdhms.equals(lastYmdhms)) {
- lastMinorSuffix++;
- }
- else {
- lastMinorSuffix = 0;
- lastYmdhms = ymdhms;
- }
- String pathStr = String.format("%s_%s", basePathname, ymdhms);
- String finalPathStr = pathStr;
- if (lastMinorSuffix > 0)
- finalPathStr += "_" + lastMinorSuffix;
- return new File(finalPathStr).toPath();
- }
-
- /**
- * Generate the path for the next active file.
- * <p>
- * The default implementation yields {@code parent/.baseLeafname}
- * from {@code basePathname}.
- * <p>
- * This hook method can be overridden.
- * <p>
- * See {@link IFileWriterPolicy#getNextActiveFilePath()} regarding
- * constraints.
- *
- * @return path to use for the next active file.
- */
- protected Path hookGenerateNextActiveFilePath() {
- return parent.resolve("." + baseLeafname);
- }
-
- /**
- * "Rename" the active file to the final path.
- * <p>
- * The default implementation uses {@code java.io.File.renameTo()}
- * and works for the default {@link #hookGenerateNextActiveFilePath()}
- * and {@link #hookGenerateFinalFilePath(Path path)} implementations.
- * <p>
- * This hook method can be overridden.
- *
- * @param activePath path of the active file
- * @param finalPath path to the final destination
- * @throws IOException on failure
- */
- protected void hookRenameFile(Path activePath, Path finalPath) throws IOException {
- trace.info("finalizing to {}", finalPath);
- activePath.toFile().renameTo(finalPath.toFile());
- }
-
- @Override
- public String toString() {
- return String.format("basePathname:%s [retention: %s] [cycle: %s] [flush: %s]",
- basePathname,
- retentionConfig.toString(),
- cycleConfig.toString(), flushConfig.toString());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java
deleted file mode 100644
index 3cc51a4..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-/**
- * FileWriter finalized (non-active) file retention configuration control.
- * <p>
- * File removal can be any combination of:
- * <ul>
- * <li>remove a file when {@code fileCount} would be exceeded</li>
- * <li>remove a file when {@code aggregateFileSize} would be exceeded</li>
- * <li>remove a file that's older than {@code ageSec} seconds</li>
- * </ul>
- */
-public class FileWriterRetentionConfig {
- private int fileCount;
- private long aggregateFileSize;
- private long ageSec;
- private long periodMsec;
-
- /** same as {@code newConfig(fileCount, 0, 0, 0)}
- *
- * @param fileCount remove a file when {@code fileCount} would be exceeded. 0 to disable.
- * @return the retention config
- */
- public static FileWriterRetentionConfig newFileCountBasedConfig(int fileCount) {
- if (fileCount < 1)
- throw new IllegalArgumentException("fileCount");
- return newConfig(fileCount, 0, 0, 0);
- }
- /** same as {@code newConfig(0, aggregateFileSize, 0, 0)}
- *
- * @param aggregateFileSize remove a file when {@code aggregateFileSize} would be exceeded. 0 to disable.
- * @return the retention config
- */
- public static FileWriterRetentionConfig newAggregateFileSizeBasedConfig(long aggregateFileSize) {
- if (aggregateFileSize < 1)
- throw new IllegalArgumentException("aggregateFileSize");
- return newConfig(0, aggregateFileSize, 0, 0);
- }
- /** same as {@code newConfig(0, 0, ageSe, periodMsecc)}
- *
- * @param ageSec remove a file that's older than {@code ageSec} seconds. 0 to disable.
- * @param periodMsec frequency for checking for ageSec based removal. 0 to disable.]
- * @return the retention config
- */
- public static FileWriterRetentionConfig newAgeBasedConfig(long ageSec, long periodMsec) {
- if (ageSec < 1)
- throw new IllegalArgumentException("ageSec");
- if (periodMsec < 1)
- throw new IllegalArgumentException("periodMsec");
- return newConfig(0, 0, ageSec, periodMsec);
- }
-
- /**
- * Create a new configuration.
- *
- * @param fileCount remove a file when {@code fileCount} would be exceeded. 0 to disable.
- * @param aggregateFileSize remove a file when {@code aggregateFileSize} would be exceeded. 0 to disable.
- * @param ageSec remove a file that's older than {@code ageSec} seconds. 0 to disable.
- * @param periodMsec frequency for checking for ageSec based removal. 0 to disable.]
- * @return the retention config
- */
- public static FileWriterRetentionConfig newConfig(int fileCount, long aggregateFileSize, long ageSec, long periodMsec) {
- return new FileWriterRetentionConfig(fileCount, aggregateFileSize, ageSec, periodMsec);
- }
-
- private FileWriterRetentionConfig(int fileCount, long aggregateFileSize, long ageSec, long periodMsec) {
- if (fileCount < 0)
- throw new IllegalArgumentException("fileCount");
- if (aggregateFileSize < 0)
- throw new IllegalArgumentException("aggregateFileSize");
- if (ageSec < 0)
- throw new IllegalArgumentException("ageSec");
- if (periodMsec < 0)
- throw new IllegalArgumentException("periodMsec");
- if (fileCount==0 && aggregateFileSize==0 && (ageSec==0 || periodMsec==0))
- throw new IllegalArgumentException("no retention configuration specified");
- this.fileCount = fileCount;
- this.aggregateFileSize = aggregateFileSize;
- this.ageSec = ageSec;
- this.periodMsec = periodMsec;
- }
-
- /**
- * Get the file count configuration value.
- * @return the value
- */
- public int getFileCount() { return fileCount; }
-
- /**
- * Get the aggregate file size configuration value.
- * @return the value
- */
- public long getAggregateFileSize() { return aggregateFileSize; }
-
- /**
- * Get the file age configuration value.
- * @return the value
- */
- public long getAgeSec() { return ageSec; }
-
- /**
- * Get the time period configuration value.
- * @return the value
- */
- public long getPeriodMsec() { return periodMsec; }
-
- /**
- * Evaluate if the specified values indicate that a final file should
- * be removed.
- *
- * @param fileCount the current number of retained files
- * @param aggregateFileSize the aggregate size of all of the retained files
- * @return true if a retained file should be removed.
- */
- public boolean evaluate(int fileCount, long aggregateFileSize) {
- return (this.fileCount > 0 && fileCount > this.fileCount)
- || (this.aggregateFileSize > 0 && aggregateFileSize > this.aggregateFileSize);
- }
-
- @Override
- public String toString() {
- return String.format("fileCount:%d aggSize:%d ageSec:%d",
- getFileCount(), getAggregateFileSize(), getAgeSec());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/package-info.java b/connectors/file/src/main/java/edgent/connectors/file/package-info.java
deleted file mode 100644
index 5c11884..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * File stream connector.
- * <p>
- * Stream tuples may be written to files
- * and created by reading from files.
- */
-package edgent.connectors.file;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java
deleted file mode 100644
index 970f8df..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-/**
- * Generic class for writing of tuples to a file.
- * <p>
- * The class is not responsible for flush strategy, finalize strategy, etc
- */
-abstract class AbstractWriterFile<T> {
- private final Path path;
- protected long size;
- private long tupleCnt;
- public AbstractWriterFile(Path path) {
- this.path = path;
- }
- public Path path() { return path; }
- public long size() { return size; }
- public long tupleCnt() { return tupleCnt; }
- public abstract void flush() throws IOException;
- public abstract void close() throws IOException;
- /** do what's needed to write the tuple */
- protected abstract int writeTuple(T tuple) throws IOException;
- /**
- * @param tuple the tuple to write
- * @return the number of bytes written
- * @throws IOException on failure
- */
- public int write(T tuple) throws IOException {
- tupleCnt++;
- int nbytes = writeTuple(tuple);
- size += nbytes;
- return nbytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java
deleted file mode 100644
index d0fc100..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
-import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
-import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-
-import edgent.function.Supplier;
-
-/**
- * Watch a directory for files being added to it and create a stream
- * of pathname strings for the files.
- * <p>
- * Hidden files (files starting with ".") are ignored.
- * <p>
- * The order of the files in the stream is dictated by a {@link Comparator}.
- * The default comparator orders files by {@link File#lastModified()} values.
- * There are no guarantees on the processing order of files that
- * have the same lastModified value.
- * Note, lastModified values are subject to filesystem timestamp
- * quantization - e.g., 1second.
- * <p>
- * Note: due to the asynchronous nature of things, if files in the
- * directory may be removed, the receiver of a tuple with a "new" file
- * pathname may need to be prepared for the pathname to no longer be
- * valid when it receives the tuple or during its processing of the tuple.
- * <p>
- * The behavior on MacOS may be unsavory, even as recent as Java8, as
- * MacOs Java lacks a native implementation of {@link WatchService}.
- * The result can be a delay in detecting newly created files (e.g., 10sec)
- * as well not detecting rapid deletion and recreation of a file.
- * See:
- * http://stackoverflow.com/questions/9588737/is-java-7-watchservice-slow-for-anyone-else
- */
-
-public class DirectoryWatcher implements AutoCloseable,
- FileFilter, Iterable<String> {
-
- private static final Logger trace = FileConnector.getTrace();
- private final Supplier<String> dirSupplier;
- private final Comparator<File> comparator;
- private final Set<String> seenFiles = Collections.synchronizedSet(new HashSet<>());
- private volatile File dirFile;
- private WatchService watcher;
-
- private Queue<String> pendingNames = new LinkedList<>();
-
-
- /**
- * Watch the specified directory and generate tuples corresponding
- * to files that are created in the directory.
- * <p>
- * If a null {@code comparator} is specified, the default comparator
- * described in {@link DirectoryWatcher} is used.
- *
- * @param dirSupplier the directory to watch
- * @param comparator a comparator to order the processing of
- * multiple newly seen files in the directory. may be null.
- */
- public DirectoryWatcher(Supplier<String> dirSupplier, Comparator<File> comparator) {
- this.dirSupplier = dirSupplier;
- if (comparator == null) {
- comparator = // TODO 2nd order alfanum compare when same LMT?
- (o1,o2) -> Long.compare(o1.lastModified(),
- o2.lastModified());
- }
- this.comparator = comparator;
- }
-
- private void initialize() throws IOException {
- dirFile = new File(dirSupplier.get());
-
- trace.info("watching directory {}", dirFile);
-
- Path dir = dirFile.toPath();
-
- watcher = FileSystems.getDefault().newWatchService();
- dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE);
-
- sortAndSubmit(Arrays.asList(dirFile.listFiles(this)));
- }
-
- @Override
- public void close() throws IOException {
- watcher.close();
- }
-
- protected void sortAndSubmit(List<File> files) {
- if (files.size() > 1) {
- Collections.sort(files, comparator);
- }
-
- for (File file : files) {
- if (accept(file) && file.exists()) {
- pendingNames.add(file.getAbsolutePath());
- seenFiles.add(file.getName());
- }
- }
- }
-
- /**
- * Waits for files to become available
- * and adds them through {@link #sortAndSubmit(List)}
- * to the pendingNames list which the iterator pulls from.
- */
- @SuppressWarnings("unchecked")
- private void watchForFiles() throws Exception {
-
- WatchKey key = watcher.take();
-
- List<File> newFiles = new ArrayList<>();
- boolean needFullScan = false;
- for (WatchEvent<?> watchEvent : key.pollEvents()) {
-
- if (ENTRY_CREATE == watchEvent.kind()) {
- Path newPath = ((WatchEvent<Path>) watchEvent).context();
- File newFile = toAbsFile(newPath);
- if (accept(newFile))
- newFiles.add(newFile);
- } else if (ENTRY_DELETE == watchEvent.kind()) {
- Path deletedPath = ((WatchEvent<Path>) watchEvent).context();
- File deletedFile = toAbsFile(deletedPath);
- seenFiles.remove(deletedFile.getName());
- } else if (OVERFLOW == watchEvent.kind()) {
- needFullScan = true;
- }
- }
- key.reset();
-
- if (needFullScan) {
- Collections.addAll(newFiles, dirFile.listFiles(this));
- }
- sortAndSubmit(newFiles);
- }
-
- private File toAbsFile(Path relPath) {
- return new File(dirFile, relPath.getFileName().toString());
- }
-
- @Override
- public boolean accept(File pathname) {
- // our "filter" function
- return !pathname.getName().startsWith(".")
- && !seenFiles.contains(pathname.getName());
- }
-
- @Override
- public Iterator<String> iterator() {
- try {
- initialize();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return new WatcherIterator();
- }
-
- /*
- * Iterator that returns the file names.
- * It is endless for hasNext() always returns
- * true, and next() will block in WatcherService.take
- * if no files are available.
- */
- private class WatcherIterator implements Iterator<String> {
-
- @Override
- public boolean hasNext() {
- return true;
- }
-
- @Override
- public String next() {
-
- for (;;) {
-
- String name = pendingNames.poll();
- if (name != null)
- return name;
-
- // blocks until a file appears
- // note that even when watchForFiles()
- // returns pendingNames might still be empty
- // due to filtering.
- try {
- watchForFiles();
- } catch (InterruptedException e) {
- // interpret as shutdown
- trace.debug("Interrupted");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java
deleted file mode 100644
index 614a2b0..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileConnector {
- @SuppressWarnings("unused")
- private static final FileConnector forCodeCoverage = new FileConnector();
- private static final Logger TRACER = LoggerFactory.getLogger(FileConnector.class);
-
- private FileConnector() {}
-
- public static Logger getTrace() {
- return TRACER;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java
deleted file mode 100644
index 98dff9c..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.nio.file.Path;
-
-import edgent.connectors.file.FileStreams;
-
-/**
- * An interface for file writer policies.
- * <p>
- * {@code IFileWriterPolicy} is for use by file writer implementations
- * for interacting with a policy implementation.
- * <p>
- * A policy generally implements strategies related to:
- * <ul>
- * <li>Active and final file pathname control.</li>
- * <li>Active file flush control</li>
- * <li>Active file cycle control (when to close/finalize the current active file)</li>
- * <li>file retention control</li>
- * </ul>
- * <p>
- * A file writer uses a {@code IFileWriterPolicy} in the following manner:
- * <pre>
- * IFileWriterPolicy<T> policy = some policy implementation
- * policy.initialize(basePathname, () -> myFlushFn(), () -> myCycleFn());
- * Path activeFilePath = null;
- * for each tuple {
- * if (activePathFile == null) {
- * activeFilePath = policy.getNextActivePath();
- * open an output stream to the path
- * }
- * write the appropriate contents to the active file output stream
- * policy.wrote(tuple, number of bytes written);
- * if (policy.shouldCycle()) {
- * close the active file output stream
- * policy.closeActiveFile(activeFilePath);
- * activeFilePath = null;
- * }
- * if (policy.shouldFlush()) {
- * flush the active file output stream
- * }
- * }
- * policy.close();
- *
- * void myFlushFn() {
- * flush the active file output stream
- * }
- *
- * void myCycleFn() {
- * close the active file output stream
- * policy.closeActiveFile(activeFilePath);
- * activeFilePath = null;
- * }
- * </pre>
- *
- * @param <T> stream tuple type
- */
-public interface IFileWriterPolicy<T> {
-
- /**
- * Initialize the policy with the base pathname of files to generate
- * and objects that can be
- * called to perform timer based flush or close (cycle) of the active file.
- * <p>
- * Cycling involves finalizing the active file (getting it to its
- * final destination / pathname) and applying any retention policy.
- * <p>
- * The supplied {@code closeable} must close the active file's output stream
- * and then call {@link #closeActiveFile(Path)}.
- * <p>
- * For non-timer based strategies, the file writer generally triggers
- * flush and cycle processing
- * following a tuple write as informed by {@link #shouldCycle()} and
- * {@link #shouldFlush()}.
- *
- * @param basePathname the directory and base leafname for final files
- * @param flushable function to perform the flush
- * @param closeable function to perform the close
- */
- void initialize(String basePathname, Flushable flushable, Closeable closeable);
-
- /**
- * Inform the policy of every tuple written to the active file.
- * <p>
- * The policy can use this to update its state so that it
- * can answer the questions {@link #shouldFlush()}
- * and {@link #shouldCycle()} for count, size, or
- * tuple attribute based policies.
- * <p>
- * The policy can also use this to update its state
- * for implementing time based flush and cycle policies.
- * @param tuple the tuple written
- * @param nbytes the number of bytes written
- */
- void wrote(T tuple, long nbytes);
-
- /**
- * Answers the question "should the active file be flushed?".
- * <p>
- * The state is reset to false after this returns.
- * @return true if the active file should be flushed
- */
- boolean shouldFlush();
-
- /**
- * Answers the question "should the active file be cycled?".
- * <p>
- * The state is reset to false after this returns.
- * @return true if the active file should be cycled
- */
- boolean shouldCycle();
-
- /**
- * Return the path for the next active file to write to.
- * <p>
- * If there was a current active file, {@link #closeActiveFile(Path)}
- * must be called prior to this.
- * <p>
- * The leafname must be a hidden file ({@code java.io.File.isHidden()==true}
- * to be compatible with a directory watcher
- * {@link FileStreams#directoryWatcher(edgent.topology.TopologyElement, edgent.function.Supplier)}
- *
- * @return path for the active file
- */
- Path getNextActiveFilePath();
-
- /**
- * Close the active file {@code path}.
- * <p>
- * Generate the final path for the active file and
- * rename/move/copy it as necessary to be at that final path.
- * <p>
- * Apply the retention policy.
- * <p>
- * The active file's writer iostream must be closed prior to calling this.
- *
- * @param path the active file (from {@link #getNextActiveFilePath()}).
- * @return the final path
- * @throws IOException on failure
- */
- Path closeActiveFile(Path path) throws IOException;
-
- /**
- * Release any resources utilized by this policy.
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java
deleted file mode 100644
index a7efc94..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-
-class StringWriterFile extends AbstractWriterFile<String> {
- private static Logger trace = FileConnector.getTrace();
- private BufferedWriter bw;
- private final Charset cs;
-
- public StringWriterFile(Path path, Charset cs) {
- super(path);
- this.cs = cs;
- }
-
- @Override
- protected int writeTuple(String tuple) throws IOException {
- if (bw == null) {
- trace.info("creating file {}", path());
- bw = Files.newBufferedWriter(path(), cs);
- }
- bw.write(tuple);
- bw.write("\n");
- // ugh. inefficient
- int nbytes = tuple.getBytes(cs).length;
- nbytes++;
- return nbytes;
- }
-
- @Override
- public void flush() throws IOException {
- if (bw != null) {
- trace.trace("flushing {}", path());
- bw.flush();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (bw != null) {
- trace.info("closing {}", path());
- BufferedWriter bw = this.bw;
- this.bw = null;
- bw.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java
deleted file mode 100644
index 6682a51..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-
-public class TextFileReader extends Pipe<String,String> {
-
- private static final long serialVersionUID = 1L;
- private static final Logger trace = FileConnector.getTrace();
- private volatile String encoding = "UTF-8";
- private volatile Charset charset;
- private volatile boolean shutdown;
- private volatile Function<String,String> preFn = path -> null;
- private volatile BiFunction<String,Exception,String> postFn = (path,exc) -> null;
-
- private void setShutdown(boolean b) {
- shutdown = b;
- }
-
- private boolean isShutdown() {
- return shutdown;
- }
-
- private String getEncoding() {
- return encoding;
- }
-
- public void setPre(Function<String,String> preFn) {
- if (preFn == null)
- this.preFn = path -> null;
- else
- this.preFn = preFn;
- }
-
- public void setPost(BiFunction<String,Exception,String> postFn) {
- if (postFn == null)
- this.postFn = (path,exc) -> null;
- else
- this.postFn = postFn;
- }
-
- @Override
- public synchronized void initialize(OpletContext<String,String> context) {
- super.initialize(context);
-
- charset = Charset.forName(getEncoding());
- }
-
- private void pre(String pathname, Consumer<String> dst) {
- String preStr = preFn.apply(pathname);
- if (preStr != null)
- dst.accept(preStr);
- }
-
- private void post(String pathname, Exception e, Consumer<String> dst) {
- String postStr = postFn.apply(pathname, e);
- if (postStr != null)
- dst.accept(postStr);
- }
-
- @Override
- public void accept(String pathname) {
- trace.trace("reading path={}", pathname);
- Consumer<String> dst = getDestination();
- pre(pathname, dst);
- Path path = new File(pathname).toPath();
- Exception exc = null;
- int nlines = 0;
- try (BufferedReader br = Files.newBufferedReader(path, charset)) {
- for (int i = 0;;i++) {
- if (i % 10 == 0 && isShutdown())
- break;
- String line = br.readLine();
- if (line == null)
- break;
- nlines++;
- dst.accept(line);
- }
- }
- catch (IOException e) {
- trace.error("Error processing file '{}'", pathname, e);
- exc = e;
- }
- finally {
- trace.trace("done reading nlines={} path={} ", nlines, pathname);
- post(pathname, exc, dst);
- }
- }
-
- @Override
- public void close() throws Exception {
- setShutdown(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java
deleted file mode 100644
index ab67ed5..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-
-public class TextFileWriter implements Consumer<String>, AutoCloseable {
- private static final long serialVersionUID = 1L;
- static final Logger trace = FileConnector.getTrace();
- private volatile String encoding = "UTF-8";
- private volatile Charset charset;
- private final Supplier<String> basePathname;
- private final Supplier<IFileWriterPolicy<String>> policyFn;
- private volatile boolean initialized;
- private volatile IFileWriterPolicy<String> policy;
- private StringWriterFile activeFile;
-
- private String getEncoding() {
- return encoding;
- }
-
- public TextFileWriter(Supplier<String> basePathname, Supplier<IFileWriterPolicy<String>> policy) {
- this.basePathname = basePathname;
- this.policyFn = policy;
- charset = Charset.forName(getEncoding());
- }
-
- private IFileWriterPolicy<String> getPolicy() {
- if (policy == null) {
- policy = policyFn.get();
- }
- return policy;
- }
-
- private void initialize() {
- getPolicy().initialize(basePathname.get(),
- () -> flushActiveFile(),
- () -> closeActiveFile());
- initialized = true;
- trace.info("writer policy: {}", getPolicy());
- }
-
- private synchronized void flushActiveFile() {
- if (activeFile != null) {
- try {
- activeFile.flush();
- } catch (IOException e) {
- trace.trace("flush of {} failed", activeFile.path(), e);
- }
- }
- }
-
- @Override
- public void accept(String line) {
- if (!initialized)
- initialize();
- writeLine(line);
- }
-
- private void writeLine(String line) {
- // prevent async time based cycle or flush while writing the tuple
- synchronized(this) {
- try {
- if (activeFile == null) {
- newActiveFile();
- }
- int nbytes = activeFile.write(line);
- getPolicy().wrote(line, nbytes);
- }
- catch (IOException e) {
- trace.error("Error writing tuple {} of length {} to {}",
- activeFile.tupleCnt(), line.length(), activeFile.path(), e);
- }
- }
- if (getPolicy().shouldCycle()) {
- closeActiveFile();
- }
- else if (getPolicy().shouldFlush()) {
- flushActiveFile();
- }
- }
-
- private synchronized void newActiveFile() throws IOException {
- Path path = getPolicy().getNextActiveFilePath();
- activeFile = new StringWriterFile(path, charset);
- }
-
- /**
- * close, finalize, and apply retention policy
- */
- private synchronized void closeActiveFile() {
- StringWriterFile activeFile = this.activeFile;
- try {
- this.activeFile = null;
- if (activeFile != null) {
- activeFile.close();
- getPolicy().closeActiveFile(activeFile.path());
- activeFile = null;
- }
- }
- catch (IOException e) {
- trace.error("error closing active file '{}'", activeFile.path(), e);
- }
- }
-
- @Override
- public void close() throws Exception {
- closeActiveFile();
- getPolicy().close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java b/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java
new file mode 100644
index 0000000..1a6272c
--- /dev/null
+++ b/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.file;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * A {@link FileWriterPolicy} that generates zip compressed files.
+ * <P>
+ * {@code CompressedFileWriterPolicy} is used exactly like {@code FileWriterPolicy}.
+ * The generated file names are identical to those generated by {@code FileWriterPolicy}
+ * except they have a {@code .zip} suffix.
+ * </P>
+ * <P>
+ * The active file is uncompressed.
+ * It is compressed when cycled per the {@link FileWriterCycleConfig}.
+ * Hence, a {@link FileWriterCycleConfig#newFileSizeBasedConfig(long) file size based}
+ * cycle config specifies the size of the uncompressed active file.
+ * </P>
+ * <P>
+ * An {@link FileWriterRetentionConfig#newAggregateFileSizeBasedConfig(long) aggregate
+ * file size} based retention config specifies the total size of the
+ * retained compressed files.
+ * </P>
+ * Sample use:
+ * <pre>{@code
+ * // Create a CompressedFileWriterPolicy with the configuration:
+ * // no explicit flush; cycle the active file when it exceeds 200Kb;
+ * // retain up to 1Mb of compressed files.
+ * IFileWriterPolicy<String> policy = new CompressedFileWriterPolicy(
+ * FileWriterFlushConfig.newImplicitConfig(),
+ * FileWriterCycleConfig.newFileSizeBasedConfig(200_000),
+ * FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(1_000_000));
+ * String basePathname = "/some/directory/and_base_name";
+ *
+ * TStream<String> streamToWrite = ...
+ * FileStreams.textFileWriter(streamToWrite, () -> basePathname, () -> policy)
+ * }</pre>
+ *
+ * @param <T> stream tuple type
+ */
+public class CompressedFileWriterPolicy<T> extends FileWriterPolicy<T> {
+
+ private final static String SUFFIX = ".zip";
+ private final static int BUFSIZE = 8192;
+
+ public CompressedFileWriterPolicy() {
+ super();
+ }
+
+ public CompressedFileWriterPolicy(FileWriterFlushConfig<T> flushConfig,
+ FileWriterCycleConfig<T> cycleConfig,
+ FileWriterRetentionConfig retentionConfig) {
+ super(flushConfig, cycleConfig, retentionConfig);
+ }
+
+ @Override
+ protected Path hookGenerateFinalFilePath(Path path) {
+ // finalPath = the normal finalPath + SUFFIX
+ Path finalPath = super.hookGenerateFinalFilePath(path);
+ finalPath = finalPath.getParent().resolve(finalPath.getFileName() + SUFFIX);
+ return finalPath;
+ }
+
+ @Override
+ protected void hookRenameFile(Path activePath, Path finalPath) throws IOException {
+ // compress into finalPath instead of simple rename
+ assert finalPath.toString().endsWith(SUFFIX) : finalPath.toString();
+ compressFile(activePath, finalPath);
+ activePath.toFile().delete();
+ }
+
+ protected void compressFile(Path src, Path dst) throws IOException {
+ try (
+ BufferedInputStream in = new BufferedInputStream(
+ new FileInputStream(src.toFile()), BUFSIZE);
+ ZipOutputStream out = new ZipOutputStream(
+ new BufferedOutputStream(new FileOutputStream(dst.toFile())));
+ )
+ {
+ // zip file entry name is "dst" minus the suffix.
+ String dstFileName = dst.getFileName().toString();
+ String entryName = dstFileName.substring(0, dstFileName.length() - SUFFIX.length());
+
+ out.putNextEntry(new ZipEntry(entryName));
+ byte[] data = new byte[BUFSIZE];
+ int count;
+ while ((count = in.read(data, 0, BUFSIZE)) != -1) {
+ out.write(data, 0, count);
+ }
+ }
+
+ }
+
+}