You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:32 UTC

[31/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java b/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java
deleted file mode 100644
index 8afc6cc..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileStreams.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import java.io.File;
-import java.nio.file.WatchService;
-import java.util.Comparator;
-
-import edgent.connectors.file.runtime.DirectoryWatcher;
-import edgent.connectors.file.runtime.IFileWriterPolicy;
-import edgent.connectors.file.runtime.TextFileReader;
-import edgent.connectors.file.runtime.TextFileWriter;
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.function.Supplier;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.TopologyElement;
-
-/**
- * {@code FileStreams} is a connector for integrating with file system objects.
- * <p>
- * File stream operations include:
- * <ul>
- * <li>Write tuples to text files - {@link #textFileWriter(TStream, Supplier, Supplier) textFileWriter}</li>
- * <li>Watch a directory for new files - {@link #directoryWatcher(TopologyElement, Supplier) directoryWatcher}</li>
- * <li>Create tuples from text files - {@link #textFileReader(TStream, Function, BiFunction) textFileReader}</li>
- * </ul>
- */
-public class FileStreams {
-    @SuppressWarnings("unused")
-    private static final FileStreams forCodeCoverage = new FileStreams();
-    private FileStreams() {};
-    
-    /**
-     * Declare a stream containing the absolute pathname of 
-     * newly created file names from watching {@code directory}.
-     * <p>
-     * This is the same as {@code directoryWatcher(t, () -> dir, null)}.
-     * 
-     * @param te topology element whose topology the watcher will be added to
-     * @param directory
-     *            Name of the directory to watch.
-     * @return Stream containing absolute pathnames of newly created files in
-     *            {@code directory}.
-     */
-    public static TStream<String> directoryWatcher(TopologyElement te,
-            Supplier<String> directory) {
-        return directoryWatcher(te, directory, null);
-    }
-    
-    /**
-     * Declare a stream containing the absolute pathname of 
-     * newly created file names from watching {@code directory}.
-     * <p>
-     * Hidden files (java.io.File.isHidden()==true) are ignored.
-     * This is compatible with {@code textFileWriter}.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * String dir = "/some/directory/path";
-     * Topology t = ...
-     * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir, null);
-     * }</pre>
-     * <p>
-     * The order of the files in the stream is dictated by a {@link Comparator}.
-     * The default comparator orders files by {@link File#lastModified()} values.
-     * There are no guarantees on the processing order of files that
-     * have the same lastModified value.
-     * Note, lastModified values are subject to filesystem timestamp
-     * quantization - e.g., 1second.
-     * <p>
-     * Note: due to the asynchronous nature of things, if files in the
-     * directory may be removed, the receiver of a tuple with a "new" file
-     * pathname may need to be prepared for the pathname to no longer be
-     * valid when it receives the tuple or during its processing of the tuple.
-     * <p>
-     * The behavior on MacOS may be unsavory, even as recent as Java8, as
-     * MacOs Java lacks a native implementation of {@link WatchService}.
-     * The result can be a delay in detecting newly created files (e.g., 10sec)
-     * as well not detecting rapid deletion and recreation of a file.
-     *
-     * @param te topology element whose topology the watcher will be added to
-     * @param directory
-     *            Name of the directory to watch.
-     * @param comparator
-     *            Comparator to use to order newly seen file pathnames.
-     *            May be null.
-     * @return Stream containing absolute pathnames of newly created files in
-     *            {@code directory}.
-     */
-    public static TStream<String> directoryWatcher(TopologyElement te,
-            Supplier<String> directory, Comparator<File> comparator) {
-        return te.topology().source(() -> new DirectoryWatcher(directory, comparator));
-    }
-    
-    /**
-     * Declare a stream containing the lines read from the files
-     * whose pathnames correspond to each tuple on the {@code pathnames}
-     * stream.
-     * <p>
-     * This is the same as {@code textFileReader(pathnames, null, null)}
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * String dir = "/some/directory/path";
-     * Topology t = ...
-     * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir);
-     * TStream<String> contents = FileStreams.textFileReader(pathnames);
-     * contents.print();
-     * }</pre>
-     * 
-     * @param pathnames
-     *            Stream containing pathnames of files to read.
-     * @return Stream containing lines from the files.
-     */
-    public static TStream<String> textFileReader(TStream<String> pathnames) {
-        return textFileReader(pathnames, null, null);
-    }
-
-    /**
-     * Declare a stream containing the lines read from the files
-     * whose pathnames correspond to each tuple on the {@code pathnames}
-     * stream.
-     * <p>
-     * All files are assumed to be encoded in UTF-8.  The lines are
-     * output in the order they appear in each file, with the first line of
-     * a file appearing first.  A file is not subsequently monitored for
-     * additional lines.
-     * <p>
-     * If a file can not be read, e.g., a file doesn't exist at that pathname
-     * or the pathname is for a directory,
-     * an error will be logged.
-     * <p>
-     * Optional {@code preFn} and {@code postFn} functions may be supplied.
-     * These are called prior to processing a tuple (pathname) and after
-     * respectively.  They provide a way to encode markers in the generated
-     * stream.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * // watch a directory for files, creating a stream with the contents of
-     * // each file.  Use a preFn to include a file separator marker in the
-     * // stream. Use a postFn to delete a file once it's been processed.
-     * String dir = "/some/directory/path";
-     * Topology t = ...
-     * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir);
-     * TStream<String> contents = FileStreams.textFileReader(
-     *              pathnames,
-     *              path -> { return "###<PATH-MARKER>### " + path },
-     *              (path,exception) -> { new File(path).delete(), return null; }
-     *              );
-     * contents.print();
-     * }</pre>
-     * 
-     * @param pathnames
-     *            Stream containing pathnames of files to read.
-     * @param preFn
-     *            Pre-visit {@code Function<String,String>}.
-     *            The input is the pathname.  
-     *            The result, when non-null, is added to the output stream.
-     *            The function may be null.
-     * @param postFn
-     *            Post-visit {@code BiFunction<String,Exception,String>}.
-     *            The input is the pathname and an exception.  The exception
-     *            is null if there were no errors.
-     *            The result, when non-null, is added to the output stream.
-     *            The function may be null.
-     * @return Stream containing lines from the files.
-     */
-    public static TStream<String> textFileReader(TStream<String> pathnames,
-        Function<String,String> preFn, BiFunction<String,Exception,String> postFn) {
-        
-        TextFileReader reader = new TextFileReader();
-        reader.setPre(preFn);
-        reader.setPost(postFn);
-        return pathnames.pipe(reader);
-    }
-    
-    /**
-     * Write the contents of a stream to files.
-     * <p>
-     * The default {@link FileWriterPolicy} is used.
-     * <p>
-     * This is the same as {@code textFileWriter(contents, basePathname, null)}.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * // write a stream of LogEvent to files, using the default
-     * // file writer policy
-     * String basePathname = "/myLogDir/LOG"; // yield LOG_YYYYMMDD_HHMMSS
-     * TStream<MyLogEvent> events = ...
-     * TStream<String> stringEvents = events.map(event -> event.toString()); 
-     * FileStreams.textFileWriter(stringEvents, () -> basePathname);
-     * }</pre>
-     * @param contents the lines to write
-     * @param basePathname the base pathname of the created files
-     * @return a TSink
-     */
-    public static TSink<String> textFileWriter(TStream<String> contents,
-            Supplier<String> basePathname) {
-        return textFileWriter(contents, basePathname, null);
-    }
-    
-    /**
-     * Write the contents of a stream to files subject to the control
-     * of a file writer policy.
-     * <p>
-     * A separate policy instance must be used for invocation.
-     * A default {@link FileWriterPolicy} is used if a policy is not specified.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * // write a stream of LogEvent to files using a policy of:
-     * // no additional flush, 100 events per file, retain 5 files
-     * IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
-     *           FileWriterFlushConfig.newImplicitConfig(),
-     *           FileWriterCycleConfig.newCountBasedConfig(100),
-     *           FileWriterRetentionConfig.newFileCountBasedConfig(5)
-     *           );
-     * String basePathname = "/myLogDir/LOG"; // yield LOG_YYYYMMDD_HHMMSS
-     * TStream<MyLogEvent> events = ...
-     * TStream<String> stringEvents = events.map(event -> event.toString()); 
-     * FileStreams.textFileWriter(stringEvents, () -> basePathname, () -> policy);
-     * }</pre>
-     * @param contents the lines to write
-     * @param basePathname the base pathname of the created files
-     * @param policy the policy to use.  may be null.
-     * @return a TSink
-     * @see FileWriterPolicy
-     */
-    public static TSink<String> textFileWriter(TStream<String> contents,
-            Supplier<String> basePathname, Supplier<IFileWriterPolicy<String>> policy) {
-        if (policy == null) {
-            IFileWriterPolicy<String> defaultPolicy = new FileWriterPolicy<String>(){};
-            policy = () -> defaultPolicy;
-        }
-        return contents.sink(new TextFileWriter(basePathname, policy));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java
deleted file mode 100644
index 7053f37..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterCycleConfig.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import edgent.function.Predicate;
-
-/**
- * FileWriter active file cycle configuration control.
- * <p>
- * Cycling the active file closes it, gets it to its final pathname,
- * and induces the application of a retention policy
- * {@link FileWriterRetentionConfig}.
- * <p>
- * Cycling the active file can be any combination of:
- * <ul>
- * <li>after {@code fileSize} bytes have been written</li>
- * <li>after every {@code cntTuple} tuples written</li>
- * <li>after {@code tuplePredicate} returns true</li>
- * <li>after {@code periodMsec} has elapsed since the last time based cycle</li>
- * </ul>
- * 
- * @param <T> stream tuple type
- */
-public class FileWriterCycleConfig<T> {
-    private long fileSize;
-    private int cntTuples;
-    private long periodMsec;
-    private Predicate<T> tuplePredicate;
-    
-    /** same as {@code newConfig(fileSize, 0, 0, null)}
-     * 
-     * @param <T> Tuple type
-     * @param fileSize cycle after {@code fileSize} bytes have been written. 0 to disable.
-     * @return the cycle configuration
-     */
-    public static <T> FileWriterCycleConfig<T> newFileSizeBasedConfig(long fileSize) {
-        if (fileSize < 1)
-            throw new IllegalArgumentException("fileSize");
-        return newConfig(fileSize, 0, 0, null);
-    }
-    /** same as {@code newConfig(0, cntTuples, 0, null)}
-     * 
-     * @param <T> Tuple type
-     * @param cntTuples cycle after every {@code cntTuple} tuples have been written. 0 to disable.
-     * @return the cycle configuration
-     */
-    public static <T> FileWriterCycleConfig<T> newCountBasedConfig(int cntTuples) {
-        if (cntTuples < 1)
-            throw new IllegalArgumentException("cntTuples");
-        return newConfig(0, cntTuples, 0, null);
-    }
-    /** same as {@code newConfig(0, 0, periodMsec, null)}
-     * 
-     * @param <T> Tuple type
-     * @param periodMsec cycle after {@code periodMsec} has elapsed since the last time based cycle. 0 to disable.
-     * @return the cycle configuration
-     */
-    public static <T> FileWriterCycleConfig<T> newTimeBasedConfig(long periodMsec) {
-        if (periodMsec < 1)
-            throw new IllegalArgumentException("periodMsec");
-        return newConfig(0, 0, periodMsec, null);
-    }
-    /** same as {@code newConfig(0, 0, 0, tuplePredicate)}
-     * 
-     * @param <T> Tuple type
-     * @param tuplePredicate cycle if {@code tuplePredicate} returns true. null to disable.
-     * @return the cycle configuration
-     */
-    public static <T> FileWriterCycleConfig<T> newPredicateBasedConfig(Predicate<T> tuplePredicate) {
-        return newConfig(0, 0, 0, tuplePredicate);
-    }
-    
-    /**
-     * Create a new configuration.
-     * <p>
-     * At least one configuration mode must be enabled.
-     * @param <T> Tuple type
-     * @param fileSize cycle after {@code fileSize} bytes have been written. 0 to disable.
-     * @param cntTuples cycle after every {@code cntTuple} tuples have been written. 0 to disable.
-     * @param periodMsec cycle after {@code periodMsec} has elapsed since the last time based cycle. 0 to disable.
-     * @param tuplePredicate cycle if {@code tuplePredicate} returns true. null to disable.
-     * @return the cycle configuration
-     */
-    public static <T> FileWriterCycleConfig<T> newConfig(long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
-        return new FileWriterCycleConfig<>(fileSize, cntTuples, periodMsec, tuplePredicate);
-    }
-
-    private FileWriterCycleConfig(long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
-        if (fileSize < 0)
-            throw new IllegalArgumentException("fileSize");
-        if (cntTuples < 0)
-            throw new IllegalArgumentException("cntTuples");
-        if (periodMsec < 0)
-            throw new IllegalArgumentException("periodMsec");
-        if (fileSize==0 && cntTuples==0 && periodMsec==0 && tuplePredicate==null)
-            throw new IllegalArgumentException("no cycle configuration specified");
-            
-        this.fileSize = fileSize;
-        this.cntTuples = cntTuples;
-        this.periodMsec = periodMsec;
-        this.tuplePredicate = tuplePredicate;
-    }
-    
-    /**
-     * Get the file size configuration value.
-     * @return the value
-     */
-    public long getFileSize() { return fileSize; }
-    
-    /**
-     * Get the tuple count configuration value.
-     * @return the value
-     */
-    public int getCntTuples() { return cntTuples; }
-    
-    /**
-     * Get the time period configuration value.
-     * @return the value
-     */
-    public long getPeriodMsec() { return periodMsec; }
-    
-    /**
-     * Get the tuple predicate configuration value.
-     * @return the value
-     */
-    public Predicate<T> getTuplePredicate() { return tuplePredicate; }
-    
-    /**
-     * Evaluate if the specified values indicate that a cycling of
-     * the active file should be performed.
-     * @param fileSize the number of bytes written to the active file
-     * @param nTuples number of tuples written to the active file
-     * @param tuple the tuple written to the file
-     * @return true if a cycle action should be performed.
-     */
-    public boolean evaluate(long fileSize, int nTuples, T tuple) {
-        return (this.fileSize > 0 && fileSize > this.fileSize)
-                || (cntTuples > 0 && nTuples > 0 && nTuples % cntTuples == 0)
-                || (tuplePredicate != null && tuplePredicate.test(tuple));
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("fileSize:%d cntTuples:%d periodMsec:%d tuplePredicate:%s",
-                getFileSize(), getCntTuples(), getPeriodMsec(),
-                getTuplePredicate() == null ? "no" : "yes");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java
deleted file mode 100644
index 55d691b..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterFlushConfig.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import edgent.function.Predicate;
-
-/**
- * FileWriter active file flush configuration control.
- * <p>
- * Flushing of the active file can be any combination of:
- * <ul>
- * <li>after every {@code cntTuple} tuples written</li>
- * <li>after {@code tuplePredicate} returns true</li>
- * <li>after {@code periodMsec} has elapsed since the last time based flush</li>
- * </ul>
- * If nothing specific is specified, the underlying buffered
- * writer's automatic flushing is utilized.
- * 
- * @param <T> stream tuple type
- */
-public class FileWriterFlushConfig<T> {
-    private int cntTuples;
-    private long periodMsec;
-    private Predicate<T> tuplePredicate;
-    
-    /**
-     * Create a new configuration.
-     * <p>
-     * The underlying buffered writer's automatic flushing is used.
-     * <p>
-     * Same as {@code newConfig(0, 0, null)}
-     * 
-     * @param <T> Tuple type
-     * @return the flush configuration
-     */
-    public static <T> FileWriterFlushConfig<T> newImplicitConfig() {
-        return newConfig(0,0,null);
-    }
-    /** same as {@code newConfig(cntTuples, 0, null)}
-     * 
-     * @param <T> Tuple type
-     * @param cntTuples flush every {@code cntTuple} tuples written. 0 to disable.
-     * @return the flush configuration
-     */
-    public static <T> FileWriterFlushConfig<T> newCountBasedConfig(int cntTuples) {
-        if (cntTuples < 1)
-            throw new IllegalArgumentException("cntTuples");
-        return newConfig(cntTuples, 0, null);
-    }
-    /** same as {@code newConfig(0, periodMsec, null)}
-     * 
-     * @param <T> Tuple type
-     * @param periodMsec flush every {@code periodMsec} milliseconds.  0 to disable.
-     * @return the flush configuration
-     */
-    public static <T> FileWriterFlushConfig<T> newTimeBasedConfig(long periodMsec) {
-        if (periodMsec < 1)
-            throw new IllegalArgumentException("periodMsec");
-        return newConfig(0, periodMsec, null);
-    }
-    /** same as {@code newConfig(0, 0, tuplePredicate)}
-     * 
-     * @param <T> Tuple type
-     * @param tuplePredicate flush if {@code tuplePredicate} is true. null to disable.
-     * @return the flush configuration
-     */
-    public static <T> FileWriterFlushConfig<T> newPredicateBasedConfig(Predicate<T> tuplePredicate) {
-        if (tuplePredicate == null)
-            throw new IllegalArgumentException("tuplePredicate");
-        return newConfig(0, 0, tuplePredicate);
-    }
-    /**
-     * Create a new configuration.
-     * <p>
-     * If nothing specific is specified, the underlying buffered
-     * writer's automatic flushing is utilized.
-     *
-     * @param <T> Tuple type
-     * @param cntTuples flush every {@code cntTuple} tuples written. 0 to disable.
-     * @param periodMsec flush every {@code periodMsec} milliseconds.  0 to disable.
-     * @param tuplePredicate flush if {@code tuplePredicate} is true. null to disable.
-     * @return the flush configuration
-     */
-    public static <T> FileWriterFlushConfig<T> newConfig(int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
-        return new FileWriterFlushConfig<>(cntTuples, periodMsec, tuplePredicate);
-    }
-    
-    private FileWriterFlushConfig(int cntTuples, long periodMsec, Predicate<T> tuplePredicate) {
-        if (cntTuples < 0)
-            throw new IllegalArgumentException("cntTuples");
-        if (periodMsec < 0)
-            throw new IllegalArgumentException("periodMsec");
-        this.cntTuples = cntTuples;
-        this.periodMsec = periodMsec;
-        this.tuplePredicate = tuplePredicate;
-    }    
-    
-    /**
-     * Get the tuple count configuration value.
-     * @return the value
-     */
-    public int getCntTuples() { return cntTuples; }
-    
-    /**
-     * Get the time period configuration value.
-     * @return the value
-     */
-    public long getPeriodMsec() { return periodMsec; }
-    
-    /**
-     * Get the tuple predicate configuration value.
-     * @return the value
-     */
-    public Predicate<T> getTuplePredicate() { return tuplePredicate; }
-    
-    /**
-     * Evaluate if the specified values indicate that a flush should be
-     * performed.
-     * @param nTuples number of tuples written to the active file
-     * @param tuple the tuple written to the file
-     * @return true if a flush should be performed.
-     */
-    public boolean evaluate(int nTuples, T tuple) {
-        return (cntTuples > 0 && nTuples > 0 && nTuples % cntTuples == 0)
-                || (tuplePredicate != null && tuplePredicate.test(tuple));
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("cntTuples:%d periodMsec:%d tuplePredicate:%s",
-                getCntTuples(), getPeriodMsec(),
-                getTuplePredicate() == null ? "no" : "yes");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java
deleted file mode 100644
index c7849c4..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterPolicy.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.Flushable;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-import edgent.connectors.file.runtime.FileConnector;
-import edgent.connectors.file.runtime.IFileWriterPolicy;
-
-/**
- * A full featured {@link IFileWriterPolicy} implementation.
- * <p>
- * The policy implements strategies for:
- * <ul>
- * <li>Active and final file pathname control.</li>
- * <li>Active file flush control (via @{link FileWriterFlushControl})</li>
- * <li>Active file cycle control (when to close/finalize the current active file;
- *     via @{link FileWriterCycleControl})</li>
- * <li>file retention control (via @{link FileWriterRetentionControl})</li>
- * </ul>
- * The policy is very configurable.  If additional flexibility is required
- * the class can be extended and documented "hook" methods overridden,
- * or an alternative full implementation of {@code FileWriterPolicy} can be
- * created.
- * <p>
- * Sample use:
- * <pre>
- * FileWriterPolicy&lt;String&gt; policy = new FileWriterPolicy(
- *     FileWriterFlushConfig.newImplicitConfig(),
- *     FileWriterCycleConfig.newCountBasedConfig(1000),
- *     FileWriterRetentionConfig.newCountBasedConfig(10));
- * String basePathname = "/some/directory/and_base_name";
- * 
- * TStream&lt;String&gt; streamToWrite = ...
- * FileStreams.textFileWriter(streamToWrite, () -&gt; basePathname, () -&gt; policy)
- * </pre>
- * 
- * @param <T> stream tuple type
- * @see FileWriterFlushConfig
- * @see FileWriterCycleConfig
- * @see FileWriterRetentionConfig
- */
-public class FileWriterPolicy<T> implements IFileWriterPolicy<T> {
-    private static final Logger trace = FileConnector.getTrace();
-    private final FileWriterFlushConfig<T> flushConfig; 
-    private final FileWriterCycleConfig<T> cycleConfig; 
-    private final FileWriterRetentionConfig retentionConfig; 
-    private String basePathname;
-    private Path parent;
-    private String baseLeafname;
-    private Flushable flushable;
-    private Closeable closeable;
-    private volatile int curTupleCnt;
-    private volatile long curSize;
-    private volatile boolean flushIt;
-    private volatile boolean cycleIt;
-    private volatile String lastYmdhms;
-    private volatile int lastMinorSuffix;
-    private final List<Path> retainedPaths = new ArrayList<>(); // oldest first
-    private volatile ScheduledExecutorService executor;
-    
-    /**
-     * Create a new file writer policy instance.
-     * <p>
-     * The configuration is:
-     * <ul>
-     * <li>10 second time based active file flushing</li>
-     * <li>1MB file size based active file cycling</li>
-     * <li>10 file retention count</li>
-     * </ul>
-     * The active and final file pathname behavior is specified in
-     * {@link #FileWriterPolicy(FileWriterFlushConfig, FileWriterCycleConfig, FileWriterRetentionConfig)}
-     */
-    public FileWriterPolicy() {
-        this(FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.SECONDS.toMillis(10)),
-            FileWriterCycleConfig.newFileSizeBasedConfig(1*1024*1024),
-            FileWriterRetentionConfig.newFileCountBasedConfig(10)); 
-    }
-    
-    /**
-     * Create a new file writer policy instance.
-     * <p>
-     * {@code flushConfig}, {@code cycleConfig} and {@code retentionConfig}
-     * specify the configuration of the various controls.
-     * <p>
-     * The active file and final file pathnames are based
-     * on the {@code basePathname} received in 
-     * {@link #initialize(String, Flushable, Closeable)}.
-     * <p>
-     * Where {@code parent} and {@code baseLeafname} are the 
-     * parent path and file name respectively of {@code basePathname}:
-     * <ul>
-     * <li>the active file is {@code parent/.baseLeafname}"</li>
-     * <li>final file names are {@code parent/baseLeafname_YYYYMMDD_HHMMSS[_<n>]}
-     *     where the optional {@code _<n>} suffix is only present if needed
-     *     to distinguish a file from the previously finalized file.
-     *     {@code <n>} starts at 1 and is monotonically incremented.
-     *     </li>
-     * </ul>
-     * @param flushConfig active file flush control configuration
-     * @param cycleConfig active file cycle control configuration
-     * @param retentionConfig final file retention control configuration
-     */
-    public FileWriterPolicy(FileWriterFlushConfig<T> flushConfig,
-            FileWriterCycleConfig<T> cycleConfig,
-            FileWriterRetentionConfig retentionConfig) {
-        this.flushConfig = flushConfig;
-        this.cycleConfig = cycleConfig;
-        this.retentionConfig = retentionConfig;
-    }
-    
-    @Override
-    public void close() {
-        if (executor != null) {
-            executor.shutdownNow();
-        }
-    }
-
-    /**
-     * Get the policy's active file flush configuration
-     * @return the flush configuration
-     */
-    public FileWriterFlushConfig<T> getFlushConfig() {
-        return flushConfig;
-    }
-
-    /**
-     * Get the policy's active file cycle configuration
-     * @return the cycle configuration
-     */
-    public FileWriterCycleConfig<T> getCycleConfig() {
-        return cycleConfig;
-    }
-
-    /**
-     * Get the policy's retention configuration
-     * @return the retention configuration
-     */
-    public FileWriterRetentionConfig getRetentionConfig() {
-        return retentionConfig;
-    }
-
-    @Override
-    public void initialize(String basePathname, Flushable flushable,
-            Closeable closeable) {
-        this.basePathname = basePathname;
-        this.flushable = flushable;
-        this.closeable = closeable;
-        Path basePath = new File(basePathname).toPath();
-        this.parent = basePath.getParent();
-        this.baseLeafname = basePath.getFileName().toString();
-        
-        if (flushConfig.getPeriodMsec() > 0) {
-            long periodMsec = flushConfig.getPeriodMsec();
-            getExecutor().scheduleAtFixedRate(
-                    () -> { try { this.flushable.flush(); }
-                    catch (IOException e) { /*ignore*/ }
-                }, 
-                periodMsec, periodMsec, TimeUnit.MILLISECONDS);
-        }
-        if (cycleConfig.getPeriodMsec() > 0) {
-            long periodMsec = cycleConfig.getPeriodMsec();
-            getExecutor().scheduleAtFixedRate(
-                    () -> { try { this.closeable.close(); }
-                    catch (IOException e) { /*ignore*/ }
-                }, 
-                periodMsec, periodMsec, TimeUnit.MILLISECONDS);
-        }
-        if (retentionConfig.getAgeSec() > 0) {
-            long periodMsec = retentionConfig.getPeriodMsec();
-            getExecutor().scheduleAtFixedRate(
-                    () -> applyTimeBasedRetention(), 
-                    periodMsec, periodMsec, TimeUnit.MILLISECONDS);
-        }
-    }    
-
-    private ScheduledExecutorService getExecutor() {
-        if (executor == null) {
-            executor = Executors.newSingleThreadScheduledExecutor();
-        }
-        return executor;
-    }
-    
-    @Override
-    public void wrote(T tuple, long nbytes) {
-        curSize += nbytes; 
-        curTupleCnt++;
-        flushIt = flushConfig.evaluate(curTupleCnt, tuple);
-        cycleIt = cycleConfig.evaluate(curSize, curTupleCnt, tuple);
-    }
-    
-    @Override
-    public boolean shouldFlush() {
-        boolean b = flushIt;
-        flushIt = false;
-        return b;
-    }
-    
-    @Override
-    public boolean shouldCycle() {
-        boolean b = cycleIt;
-        cycleIt = false;
-        return b;
-    }
-    
-    @Override
-    public Path getNextActiveFilePath() {
-        Path path = hookGenerateNextActiveFilePath();
-        trace.trace("next active file path={}", path);
-        return path;
-    }
-    
-    @Override
-    public synchronized Path closeActiveFile(Path path) throws IOException {
-        int tmpCurTupleCnt = curTupleCnt;
-        resetActiveFileInfo();
-        Path finalPath = hookGenerateFinalFilePath(path);
-        trace.trace("closing active file nTuples={}, finalPath={}", tmpCurTupleCnt, finalPath);
-        hookRenameFile(path, finalPath);
-        retainedPaths.add(finalPath);
-        applyRetention();
-        return finalPath;
-    }
-    
-    private void resetActiveFileInfo() {
-        curSize = 0;
-        curTupleCnt = 0;
-        flushIt = false;
-        cycleIt = false;
-    }
-    
-    private synchronized void applyRetention() {
-        long aggregateFileSize = 0; // compute when enabled
-        if (retentionConfig.getAggregateFileSize() > 0) {
-            for (Path path : retainedPaths) {
-                File file = path.toFile();
-                aggregateFileSize += file.length(); // 0 if doesn't exist
-            }
-        }
-        
-        if (retentionConfig.evaluate(retainedPaths.size(), aggregateFileSize)) {
-            Path oldestPath = retainedPaths.remove(0);
-            File file = oldestPath.toFile();
-            trace.info("deleting file {}", file);
-            file.delete();
-        }
-    }
-    
-    private synchronized void applyTimeBasedRetention() {
-        long now = System.currentTimeMillis();
-        long minTime = now - TimeUnit.SECONDS.toMillis(retentionConfig.getAgeSec());
-        ArrayList<Path> toDelete = new ArrayList<>();
-        for (Path path : retainedPaths) {  // oldest first
-            File file = path.toFile();
-            if (file.lastModified() < minTime)
-                toDelete.add(path);
-            else
-                break;
-        }
-        for (Path path : toDelete) {
-            trace.info("deleting file {}", path);
-            path.toFile().delete();
-        }
-        retainedPaths.removeAll(toDelete);
-    }
-    
-    private String ymdhms() {
-        return new SimpleDateFormat("YYYYMMdd_HHmmss").format(new Date());
-    }
-    
-    /**
-     * Generate the final file path for the active file.
-     * <p>
-     * The default implementation yields:
-     * <br>
-     * final file names are {@code basePathname_YYYYMMDD_HHMMSS[_<n>]}
-     * where the optional {@code _<n>} suffix is only present if needed
-     * to distinguish a file from the previously finalized file.
-     * {@code <n>} starts at 1 and is monitonically incremented.
-     * <p>
-     * This hook method can be overridden.
-     * <p>
-     * Note, the implementation must handle the unlikely, but happens
-     * in tests, case where files are cycling very fast (multiple per sec)
-     * and the retention config tosses some within that same second.
-     * I.e., avoid generating final path sequences like:
-     * <pre>
-     * leaf_YYYYMMDD_103099
-     * leaf_YYYYMMDD_103099_1
-     * leaf_YYYYMMDD_103099_2
-     *   delete leaf_YYYYMMDD_103099  -- retention cnt was 2
-     * leaf_YYYYMMDD_103099   // should be _3
-     * </pre>
-     * 
-     * @param path the active file path to finalize
-     * @return final path for the file
-     */
-    protected Path hookGenerateFinalFilePath(Path path) {
-        String ymdhms = ymdhms();
-        if (ymdhms.equals(lastYmdhms)) {
-            lastMinorSuffix++;
-        }
-        else {
-            lastMinorSuffix = 0;
-            lastYmdhms = ymdhms;
-        }
-        String pathStr = String.format("%s_%s", basePathname, ymdhms);
-        String finalPathStr = pathStr;
-        if (lastMinorSuffix > 0)
-            finalPathStr += "_" + lastMinorSuffix;
-        return new File(finalPathStr).toPath();
-    }
-    
-    /**
-     * Generate the path for the next active file.
-     * <p>
-     * The default implementation yields {@code parent/.baseLeafname}
-     * from {@code basePathname}.
-     * <p>
-     * This hook method can be overridden.
-     * <p>
-     * See {@link IFileWriterPolicy#getNextActiveFilePath()} regarding
-     * constraints.
-     * 
-     * @return path to use for the next active file.
-     */
-    protected Path hookGenerateNextActiveFilePath() {
-        return parent.resolve("." + baseLeafname);
-    }
-
-    /**
-     * "Rename" the active file to the final path.
-     * <p>
-     * The default implementation uses {@code java.io.File.renameTo()}
-     * and works for the default {@link #hookGenerateNextActiveFilePath()}
-     * and {@link #hookGenerateFinalFilePath(Path path)} implementations.
-     * <p>
-     * This hook method can be overridden.
-     * 
-     * @param activePath path of the active file
-     * @param finalPath path to the final destination
-     * @throws IOException on failure
-     */
-    protected void hookRenameFile(Path activePath, Path finalPath) throws IOException {
-        trace.info("finalizing to {}", finalPath);
-        activePath.toFile().renameTo(finalPath.toFile());
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("basePathname:%s [retention: %s] [cycle: %s] [flush: %s]",
-                basePathname,
-                retentionConfig.toString(),
-                cycleConfig.toString(), flushConfig.toString());
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java b/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java
deleted file mode 100644
index 3cc51a4..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/FileWriterRetentionConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.file;
-
-/**
- * FileWriter finalized (non-active) file retention configuration control.
- * <p>
- * File removal can be any combination of:
- * <ul>
- * <li>remove a file when {@code fileCount} would be exceeded</li>
- * <li>remove a file when {@code aggregateFileSize} would be exceeded</li>
- * <li>remove a file that's older than {@code ageSec} seconds</li>
- * </ul>
- */
-public class FileWriterRetentionConfig {
-    private int fileCount;
-    private long aggregateFileSize;
-    private long ageSec;
-    private long periodMsec;
-    
-    /** same as {@code newConfig(fileCount, 0, 0, 0)}
-     * 
-     * @param fileCount remove a file when {@code fileCount} would be exceeded. 0 to disable.
-     * @return the retention config
-     */
-    public static  FileWriterRetentionConfig newFileCountBasedConfig(int fileCount) {
-        if (fileCount < 1)
-            throw new IllegalArgumentException("fileCount");
-        return newConfig(fileCount, 0, 0, 0);
-    }
-    /** same as {@code newConfig(0, aggregateFileSize, 0, 0)}
-     * 
-     * @param aggregateFileSize remove a file when {@code aggregateFileSize} would be exceeded. 0 to disable.
-     * @return the retention config
-     */
-    public static  FileWriterRetentionConfig newAggregateFileSizeBasedConfig(long aggregateFileSize) {
-        if (aggregateFileSize < 1)
-            throw new IllegalArgumentException("aggregateFileSize");
-        return newConfig(0, aggregateFileSize, 0, 0);
-    }
-    /** same as {@code newConfig(0, 0, ageSe, periodMsecc)}
-     * 
-     * @param ageSec remove a file that's older than {@code ageSec} seconds.  0 to disable.
-     * @param periodMsec frequency for checking for ageSec based removal. 0 to disable.]
-     * @return the retention config
-     */
-    public static  FileWriterRetentionConfig newAgeBasedConfig(long ageSec, long periodMsec) {
-        if (ageSec < 1)
-            throw new IllegalArgumentException("ageSec");
-        if (periodMsec < 1)
-            throw new IllegalArgumentException("periodMsec");
-        return newConfig(0, 0, ageSec, periodMsec);
-    }
-    
-    /**
-     * Create a new configuration.
-     * 
-     * @param fileCount remove a file when {@code fileCount} would be exceeded. 0 to disable.
-     * @param aggregateFileSize remove a file when {@code aggregateFileSize} would be exceeded. 0 to disable.
-     * @param ageSec remove a file that's older than {@code ageSec} seconds.  0 to disable.
-     * @param periodMsec frequency for checking for ageSec based removal. 0 to disable.]
-     * @return the retention config
-     */
-    public static FileWriterRetentionConfig newConfig(int fileCount, long aggregateFileSize, long ageSec, long periodMsec) {
-        return new FileWriterRetentionConfig(fileCount, aggregateFileSize, ageSec, periodMsec);
-    }
-    
-    private FileWriterRetentionConfig(int fileCount, long aggregateFileSize, long ageSec, long periodMsec) {
-        if (fileCount < 0)
-            throw new IllegalArgumentException("fileCount");
-        if (aggregateFileSize < 0)
-            throw new IllegalArgumentException("aggregateFileSize");
-        if (ageSec < 0)
-            throw new IllegalArgumentException("ageSec");
-        if (periodMsec < 0)
-            throw new IllegalArgumentException("periodMsec");
-        if (fileCount==0 && aggregateFileSize==0 && (ageSec==0 || periodMsec==0))
-            throw new IllegalArgumentException("no retention configuration specified");
-        this.fileCount = fileCount;
-        this.aggregateFileSize = aggregateFileSize;
-        this.ageSec = ageSec;
-        this.periodMsec = periodMsec;
-    }
-    
-    /**
-     * Get the file count configuration value.
-     * @return the value
-     */
-    public int getFileCount() { return fileCount; }
-    
-    /**
-     * Get the aggregate file size configuration value.
-     * @return the value
-     */
-    public long getAggregateFileSize() { return aggregateFileSize; }
-    
-    /**
-     * Get the file age configuration value.
-     * @return the value
-     */
-    public long getAgeSec() { return ageSec; }
-    
-    /**
-     * Get the time period configuration value.
-     * @return the value
-     */
-    public long getPeriodMsec() { return periodMsec; }
-    
-    /**
-     * Evaluate if the specified values indicate that a final file should
-     * be removed.
-     *
-     * @param fileCount the current number of retained files
-     * @param aggregateFileSize the aggregate size of all of the retained files
-     * @return true if a retained file should be removed.
-     */
-    public boolean evaluate(int fileCount, long aggregateFileSize) {
-        return (this.fileCount > 0 && fileCount > this.fileCount)
-                || (this.aggregateFileSize > 0 && aggregateFileSize > this.aggregateFileSize);
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("fileCount:%d aggSize:%d ageSec:%d",
-                getFileCount(), getAggregateFileSize(), getAgeSec());
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/package-info.java b/connectors/file/src/main/java/edgent/connectors/file/package-info.java
deleted file mode 100644
index 5c11884..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * File stream connector.
- * <p>
- * Stream tuples may be written to files
- * and created by reading from files.
- */
-package edgent.connectors.file;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java
deleted file mode 100644
index 970f8df..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/AbstractWriterFile.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-/**
- * Generic class for writing of tuples to a file.
- * <p>
- * The class is not responsible for flush strategy, finalize strategy, etc
- */
-abstract class AbstractWriterFile<T> {
-    private final Path path;
-    protected long size;
-    private long tupleCnt;
-    public AbstractWriterFile(Path path) {
-        this.path = path;
-    }
-    public Path path() { return path; }
-    public long size() { return size; }
-    public long tupleCnt() { return tupleCnt; }
-    public abstract void flush() throws IOException;
-    public abstract void close() throws IOException;
-    /** do what's needed to write the tuple */
-    protected abstract int writeTuple(T tuple) throws IOException;
-    /**
-     * @param tuple the tuple to write 
-     * @return the number of bytes written
-     * @throws IOException on failure
-     */
-    public int write(T tuple) throws IOException {
-        tupleCnt++;
-        int nbytes = writeTuple(tuple);
-        size += nbytes;
-        return nbytes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java
deleted file mode 100644
index d0fc100..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/DirectoryWatcher.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
-import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
-import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-
-import edgent.function.Supplier;
-
-/**
- * Watch a directory for files being added to it and create a stream
- * of pathname strings for the files.
- * <p>
- * Hidden files (files starting with ".") are ignored.
- * <p>
- * The order of the files in the stream is dictated by a {@link Comparator}.
- * The default comparator orders files by {@link File#lastModified()} values.
- * There are no guarantees on the processing order of files that
- * have the same lastModified value.
- * Note, lastModified values are subject to filesystem timestamp
- * quantization - e.g., 1second.
- * <p>
- * Note: due to the asynchronous nature of things, if files in the
- * directory may be removed, the receiver of a tuple with a "new" file
- * pathname may need to be prepared for the pathname to no longer be
- * valid when it receives the tuple or during its processing of the tuple.
- * <p>
- * The behavior on MacOS may be unsavory, even as recent as Java8, as
- * MacOs Java lacks a native implementation of {@link WatchService}.
- * The result can be a delay in detecting newly created files (e.g., 10sec)
- * as well not detecting rapid deletion and recreation of a file.
- * See:
- * http://stackoverflow.com/questions/9588737/is-java-7-watchservice-slow-for-anyone-else
- */
-
-public class DirectoryWatcher implements AutoCloseable, 
-        FileFilter, Iterable<String> {
-
-    private static final Logger trace = FileConnector.getTrace();
-    private final Supplier<String> dirSupplier;
-    private final Comparator<File> comparator;
-    private final Set<String> seenFiles = Collections.synchronizedSet(new HashSet<>());
-    private volatile File dirFile;
-    private WatchService watcher;
-    
-    private Queue<String> pendingNames = new LinkedList<>();
-    
-
-    /**
-     * Watch the specified directory and generate tuples corresponding
-     * to files that are created in the directory.
-     * <p>
-     * If a null {@code comparator} is specified, the default comparator
-     * described in {@link DirectoryWatcher} is used.
-     * 
-     * @param dirSupplier the directory to watch
-     * @param comparator a comparator to order the processing of
-     *        multiple newly seen files in the directory.  may be null.
-     */
-    public DirectoryWatcher(Supplier<String> dirSupplier, Comparator<File> comparator) {
-        this.dirSupplier = dirSupplier;
-        if (comparator == null) {
-            comparator = // TODO 2nd order alfanum compare when same LMT?
-                    (o1,o2) -> Long.compare(o1.lastModified(),
-                                            o2.lastModified());
-        }
-        this.comparator = comparator;
-    }
-    
-    private void initialize() throws IOException {
-        dirFile = new File(dirSupplier.get());
-        
-        trace.info("watching directory {}", dirFile);
-        
-        Path dir = dirFile.toPath();
-
-        watcher = FileSystems.getDefault().newWatchService();
-        dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE);
-
-        sortAndSubmit(Arrays.asList(dirFile.listFiles(this)));
-    }
-
-    @Override
-    public void close() throws IOException {
-        watcher.close();
-    }
-
-    protected void sortAndSubmit(List<File> files) {
-        if (files.size() > 1) {
-            Collections.sort(files, comparator);
-        }
-
-        for (File file : files) {
-            if (accept(file) && file.exists()) {
-                pendingNames.add(file.getAbsolutePath());
-                seenFiles.add(file.getName());
-            }
-        }
-    }
-
-    /**
-     * Waits for files to become available 
-     * and adds them through {@link #sortAndSubmit(List)}
-     * to the pendingNames list which the iterator pulls from.
-     */
-    @SuppressWarnings("unchecked")
-    private void watchForFiles() throws Exception {
-
-        WatchKey key = watcher.take();
-
-        List<File> newFiles = new ArrayList<>();
-        boolean needFullScan = false;
-        for (WatchEvent<?> watchEvent : key.pollEvents()) {
-
-            if (ENTRY_CREATE == watchEvent.kind()) {
-                Path newPath = ((WatchEvent<Path>) watchEvent).context();
-                File newFile = toAbsFile(newPath);
-                if (accept(newFile))
-                    newFiles.add(newFile);
-            } else if (ENTRY_DELETE == watchEvent.kind()) {
-                Path deletedPath = ((WatchEvent<Path>) watchEvent).context();
-                File deletedFile = toAbsFile(deletedPath);
-                seenFiles.remove(deletedFile.getName());
-            } else if (OVERFLOW == watchEvent.kind()) {
-                needFullScan = true;
-            }
-        }
-        key.reset();
-
-        if (needFullScan) {
-            Collections.addAll(newFiles, dirFile.listFiles(this));
-        }
-        sortAndSubmit(newFiles);
-    }
-
-    private File toAbsFile(Path relPath) {
-        return new File(dirFile, relPath.getFileName().toString());
-    }
-
-    @Override
-    public boolean accept(File pathname) {
-        // our "filter" function
-        return !pathname.getName().startsWith(".")
-                && !seenFiles.contains(pathname.getName());
-    }
-
-    @Override
-    public Iterator<String> iterator() {
-        try {
-            initialize();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return new WatcherIterator();
-    }
-    
-    /*
-     * Iterator that returns the file names.
-     * It is endless for hasNext() always returns
-     * true, and next() will block in WatcherService.take
-     * if no files are available.
-     */
-    private class WatcherIterator implements Iterator<String> {
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public String next() {
-
-            for (;;) {
-
-                String name = pendingNames.poll();
-                if (name != null)
-                    return name;
-
-                // blocks until a file appears
-                // note that even when watchForFiles()
-                // returns pendingNames might still be empty
-                // due to filtering.
-                try {
-                    watchForFiles();
-                } catch (InterruptedException e) {
-                    // interpret as shutdown
-                    trace.debug("Interrupted");
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java
deleted file mode 100644
index 614a2b0..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/FileConnector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileConnector {
-    @SuppressWarnings("unused")
-    private static final FileConnector forCodeCoverage = new FileConnector();
-    private static final Logger TRACER = LoggerFactory.getLogger(FileConnector.class);
-    
-    private FileConnector() {}
-    
-    public static Logger getTrace() {
-        return TRACER;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java
deleted file mode 100644
index 98dff9c..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/IFileWriterPolicy.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.nio.file.Path;
-
-import edgent.connectors.file.FileStreams;
-
-/**
- * An interface for file writer policies.
- * <p>
- * {@code IFileWriterPolicy} is for use by file writer implementations
- * for interacting with a policy implementation.
- * <p>
- * A policy generally implements strategies related to:
- * <ul>
- * <li>Active and final file pathname control.</li>
- * <li>Active file flush control</li>
- * <li>Active file cycle control (when to close/finalize the current active file)</li>
- * <li>file retention control</li>
- * </ul>
- * <p>
- * A file writer uses a {@code IFileWriterPolicy} in the following manner:
- * <pre>
- * IFileWriterPolicy&lt;T&gt; policy = some policy implementation
- * policy.initialize(basePathname, () -&gt; myFlushFn(), () -&gt; myCycleFn());
- * Path activeFilePath = null;
- * for each tuple {
- *   if (activePathFile == null) {
- *     activeFilePath = policy.getNextActivePath();
- *     open an output stream to the path
- *   }
- *   write the appropriate contents to the active file output stream
- *   policy.wrote(tuple, number of bytes written);
- *   if (policy.shouldCycle()) {
- *     close the active file output stream
- *     policy.closeActiveFile(activeFilePath);
- *     activeFilePath = null;
- *   } 
- *   if (policy.shouldFlush()) {
- *     flush the active file output stream
- *   }
- * }
- * policy.close();
- * 
- * void myFlushFn() {
- *   flush the active file output stream
- * }
- * 
- * void myCycleFn() {
- *   close the active file output stream
- *   policy.closeActiveFile(activeFilePath);
- *   activeFilePath = null;
- * }
- * </pre>
- * 
- * @param <T> stream tuple type
- */
-public interface IFileWriterPolicy<T> {
-    
-    /**
-     * Initialize the policy with the base pathname of files to generate
-     * and objects that can be
-     * called to perform timer based flush or close (cycle) of the active file.
-     * <p>
-     * Cycling involves finalizing the active file (getting it to its
-     * final destination / pathname) and applying any retention policy.
-     * <p>
-     * The supplied {@code closeable} must close the active file's output stream
-     * and then call {@link #closeActiveFile(Path)}.
-     * <p>
-     * For non-timer based strategies, the file writer generally triggers
-     * flush and cycle processing
-     * following a tuple write as informed by {@link #shouldCycle()} and
-     * {@link #shouldFlush()}. 
-     * 
-     * @param basePathname the directory and base leafname for final files
-     * @param flushable function to perform the flush 
-     * @param closeable function to perform the close
-     */
-    void initialize(String basePathname, Flushable flushable, Closeable closeable);
-    
-    /**
-     * Inform the policy of every tuple written to the active file.
-     * <p>
-     * The policy can use this to update its state so that it
-     * can answer the questions {@link #shouldFlush()}
-     * and {@link #shouldCycle()} for count, size, or
-     * tuple attribute based policies.
-     * <p>
-     * The policy can also use this to update its state
-     * for implementing time based flush and cycle policies. 
-     * @param tuple the tuple written
-     * @param nbytes the number of bytes written
-     */
-    void wrote(T tuple, long nbytes);
-    
-    /**
-     * Answers the question "should the active file be flushed?".
-     * <p>
-     * The state is reset to false after this returns.
-     * @return true if the active file should be flushed
-     */
-    boolean shouldFlush();
-    
-    /**
-     * Answers the question "should the active file be cycled?".
-     * <p>
-     * The state is reset to false after this returns.
-     * @return true if the active file should be cycled
-     */
-    boolean shouldCycle();
-    
-    /**
-     * Return the path for the next active file to write to.
-     * <p>
-     * If there was a current active file, {@link #closeActiveFile(Path)}
-     * must be called prior to this.
-     * <p>
-     * The leafname must be a hidden file ({@code java.io.File.isHidden()==true}
-     * to be compatible with a directory watcher
-     * {@link FileStreams#directoryWatcher(edgent.topology.TopologyElement, edgent.function.Supplier)}
-     * 
-     * @return path for the active file
-     */
-    Path getNextActiveFilePath();
-    
-    /**
-     * Close the active file {@code path}.
-     * <p>
-     * Generate the final path for the active file and  
-     * rename/move/copy it as necessary to be at that final path.
-     * <p>
-     * Apply the retention policy.
-     * <p>
-     * The active file's writer iostream must be closed prior to calling this.
-     * 
-     * @param path the active file (from {@link #getNextActiveFilePath()}).
-     * @return the final path
-     * @throws IOException on failure
-     */
-    Path closeActiveFile(Path path) throws IOException;
-    
-    /**
-     * Release any resources utilized by this policy.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java
deleted file mode 100644
index a7efc94..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/StringWriterFile.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-
-class StringWriterFile extends AbstractWriterFile<String> {
-    private static Logger trace = FileConnector.getTrace();
-    private BufferedWriter bw;
-    private final Charset cs;
-
-    public StringWriterFile(Path path, Charset cs) {
-        super(path);
-        this.cs = cs;
-    }
-
-    @Override
-    protected int writeTuple(String tuple) throws IOException {
-        if (bw == null) {
-            trace.info("creating file {}", path());
-            bw = Files.newBufferedWriter(path(), cs);
-        }
-        bw.write(tuple);
-        bw.write("\n");
-        // ugh. inefficient
-        int nbytes = tuple.getBytes(cs).length;
-        nbytes++;
-        return nbytes;
-    }
-
-    @Override
-    public void flush() throws IOException {
-        if (bw != null) {
-            trace.trace("flushing {}", path());
-            bw.flush();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (bw != null) {
-            trace.info("closing {}", path());
-            BufferedWriter bw = this.bw;
-            this.bw = null;
-            bw.close();
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java
deleted file mode 100644
index 6682a51..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-
-public class TextFileReader extends Pipe<String,String> {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger trace = FileConnector.getTrace();
-    private volatile String encoding = "UTF-8";
-    private volatile Charset charset;
-    private volatile boolean shutdown;
-    private volatile Function<String,String> preFn = path -> null;
-    private volatile BiFunction<String,Exception,String> postFn = (path,exc) -> null;
-
-    private void setShutdown(boolean b) {
-        shutdown = b;
-    }
-
-    private boolean isShutdown() {
-        return shutdown;
-    }
-    
-    private String getEncoding() {
-        return encoding;
-    }
-    
-    public void setPre(Function<String,String> preFn) {
-        if (preFn == null)
-            this.preFn = path -> null;
-        else
-            this.preFn = preFn;
-    }
-    
-    public void setPost(BiFunction<String,Exception,String> postFn) {
-        if (postFn == null)
-            this.postFn = (path,exc) -> null;
-        else
-            this.postFn = postFn;
-    }
-
-    @Override
-    public synchronized void initialize(OpletContext<String,String> context) {
-        super.initialize(context);
-
-        charset = Charset.forName(getEncoding());
-    }
-    
-    private void pre(String pathname, Consumer<String> dst) {
-        String preStr = preFn.apply(pathname);
-        if (preStr != null)
-            dst.accept(preStr);
-    }
-    
-    private void post(String pathname, Exception e, Consumer<String> dst) {
-        String postStr = postFn.apply(pathname, e);
-        if (postStr != null)
-            dst.accept(postStr);
-    }
-
-    @Override
-    public void accept(String pathname) {
-        trace.trace("reading path={}", pathname);
-        Consumer<String> dst = getDestination();
-        pre(pathname, dst);
-        Path path = new File(pathname).toPath();
-        Exception exc = null;
-        int nlines = 0;
-        try (BufferedReader br = Files.newBufferedReader(path, charset)) {
-            for (int i = 0;;i++) {
-                if (i % 10 == 0 && isShutdown())
-                    break;
-                String line = br.readLine();
-                if (line == null)
-                    break;
-                nlines++;
-                dst.accept(line);
-            }
-        }
-        catch (IOException e) {
-            trace.error("Error processing file '{}'", pathname, e);
-            exc = e;
-        }
-        finally {
-            trace.trace("done reading nlines={} path={} ", nlines, pathname);
-            post(pathname, exc, dst);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        setShutdown(true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java b/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java
deleted file mode 100644
index ab67ed5..0000000
--- a/connectors/file/src/main/java/edgent/connectors/file/runtime/TextFileWriter.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.file.runtime;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-
-public class TextFileWriter implements Consumer<String>, AutoCloseable {
-    private static final long serialVersionUID = 1L;
-    static final Logger trace = FileConnector.getTrace();
-    private volatile String encoding = "UTF-8";
-    private volatile Charset charset;
-    private final Supplier<String> basePathname;
-    private final Supplier<IFileWriterPolicy<String>> policyFn;
-    private volatile boolean initialized;
-    private volatile IFileWriterPolicy<String> policy;
-    private StringWriterFile activeFile;
-    
-    private String getEncoding() {
-        return encoding;
-    }
-
-    public TextFileWriter(Supplier<String> basePathname, Supplier<IFileWriterPolicy<String>> policy) {
-        this.basePathname = basePathname;
-        this.policyFn = policy;
-        charset = Charset.forName(getEncoding());
-    }
-    
-    private IFileWriterPolicy<String> getPolicy() {
-        if (policy == null) {
-            policy = policyFn.get();
-        }
-        return policy;
-    }
-    
-    private void initialize() {
-        getPolicy().initialize(basePathname.get(),
-                                () -> flushActiveFile(),
-                                () -> closeActiveFile());
-        initialized = true;
-        trace.info("writer policy: {}", getPolicy());
-    }
-    
-    private synchronized void flushActiveFile() {
-        if (activeFile != null) {
-            try {
-                activeFile.flush();
-            } catch (IOException e) {
-                trace.trace("flush of {} failed", activeFile.path(), e);
-            }
-        }
-    }
-
-    @Override
-    public void accept(String line) {
-        if (!initialized)
-            initialize();
-        writeLine(line);
-    }
-    
-    private void writeLine(String line) {
-        // prevent async time based cycle or flush while writing the tuple
-        synchronized(this) {
-            try {
-                if (activeFile == null) {
-                    newActiveFile();
-                }
-                int nbytes = activeFile.write(line);
-                getPolicy().wrote(line, nbytes);
-            }
-            catch (IOException e) {
-                trace.error("Error writing tuple {} of length {} to {}",
-                        activeFile.tupleCnt(), line.length(), activeFile.path(), e);
-            }
-        }
-        if (getPolicy().shouldCycle()) {
-            closeActiveFile();
-        }
-        else if (getPolicy().shouldFlush()) {
-            flushActiveFile();
-        }
-    }
-    
-    private synchronized void newActiveFile() throws IOException {
-        Path path = getPolicy().getNextActiveFilePath();
-        activeFile = new StringWriterFile(path, charset);
-    }
-
-    /**
-     * close, finalize, and apply retention policy
-     */
-    private synchronized void closeActiveFile() {
-        StringWriterFile activeFile = this.activeFile;
-        try {
-            this.activeFile = null;
-            if (activeFile != null) {
-                activeFile.close();
-                getPolicy().closeActiveFile(activeFile.path());
-                activeFile = null;
-            }
-        }
-        catch (IOException e) {
-            trace.error("error closing active file '{}'", activeFile.path(), e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeActiveFile();
-        getPolicy().close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java b/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java
new file mode 100644
index 0000000..1a6272c
--- /dev/null
+++ b/connectors/file/src/main/java/org/apache/edgent/connectors/file/CompressedFileWriterPolicy.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.file;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * A {@link FileWriterPolicy} that generates zip compressed files.
+ * <P>
+ * {@code CompressedFileWriterPolicy} is used exactly like {@code FileWriterPolicy}.
+ * The generated file names are identical to those generated by {@code FileWriterPolicy}
+ * except they have a {@code .zip} suffix.
+ * </P>
+ * <P>
+ * The active file is uncompressed.  
+ * It is compressed when cycled per the {@link FileWriterCycleConfig}.
+ * Hence, a {@link FileWriterCycleConfig#newFileSizeBasedConfig(long) file size based}
+ * cycle config specifies the size of the uncompressed active file.
+ * </P>
+ * <P>
+ * An {@link FileWriterRetentionConfig#newAggregateFileSizeBasedConfig(long) aggregate
+ * file size} based retention config specifies the total size of the
+ * retained compressed files.
+ * </P>
+ * Sample use:
+ * <pre>{@code
+ * // Create a CompressedFileWriterPolicy with the configuration:
+ * // no explicit flush; cycle the active file when it exceeds 200Kb;
+ * // retain up to 1Mb of compressed files.
+ * IFileWriterPolicy<String> policy = new CompressedFileWriterPolicy(
+ *     FileWriterFlushConfig.newImplicitConfig(),
+ *     FileWriterCycleConfig.newFileSizeBasedConfig(200_000),
+ *     FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(1_000_000));
+ * String basePathname = "/some/directory/and_base_name";
+ * 
+ * TStream<String> streamToWrite = ...
+ * FileStreams.textFileWriter(streamToWrite, () -> basePathname, () -> policy)
+ * }</pre>
+ *
+ * @param <T> stream tuple type
+ */
+public class CompressedFileWriterPolicy<T> extends FileWriterPolicy<T> {
+  
+  private final static String SUFFIX = ".zip";
+  private final static int BUFSIZE = 8192;
+
+  public CompressedFileWriterPolicy() {
+    super();
+  }
+  
+  public CompressedFileWriterPolicy(FileWriterFlushConfig<T> flushConfig,
+      FileWriterCycleConfig<T> cycleConfig,
+      FileWriterRetentionConfig retentionConfig) {
+    super(flushConfig, cycleConfig, retentionConfig);
+  }
+
+  @Override
+  protected Path hookGenerateFinalFilePath(Path path) {
+    // finalPath = the normal finalPath + SUFFIX
+    Path finalPath = super.hookGenerateFinalFilePath(path);
+    finalPath = finalPath.getParent().resolve(finalPath.getFileName() + SUFFIX);
+    return finalPath;
+  }
+
+  @Override
+  protected void hookRenameFile(Path activePath, Path finalPath) throws IOException {
+    // compress into finalPath instead of simple rename
+    assert finalPath.toString().endsWith(SUFFIX) : finalPath.toString();
+    compressFile(activePath, finalPath);
+    activePath.toFile().delete();
+  }
+  
+  protected void compressFile(Path src, Path dst) throws IOException {
+    try (
+        BufferedInputStream in = new BufferedInputStream(
+                new FileInputStream(src.toFile()), BUFSIZE);
+        ZipOutputStream out = new ZipOutputStream(
+                new BufferedOutputStream(new FileOutputStream(dst.toFile())));
+        )
+    {
+      // zip file entry name is "dst" minus the suffix.
+      String dstFileName = dst.getFileName().toString();
+      String entryName = dstFileName.substring(0, dstFileName.length() - SUFFIX.length());
+      
+      out.putNextEntry(new ZipEntry(entryName));
+      byte[] data = new byte[BUFSIZE];
+      int count;
+      while ((count = in.read(data, 0, BUFSIZE)) != -1) {
+        out.write(data, 0, count);
+      }
+    }
+    
+  }
+
+}