You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:30 UTC
[29/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTest.java b/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTest.java
deleted file mode 100644
index 5c1401c..0000000
--- a/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.file;
-
-import static org.junit.Assume.assumeTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.AccessDeniedException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-import org.junit.Test;
-
-import edgent.connectors.file.FileStreams;
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.test.connectors.common.FileUtil;
-import edgent.test.providers.direct.DirectTestSetup;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-
-public class FileStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
-
- String[] stdLines = new String[] {
- "If you can keep your head when all about you",
- "Are losing theirs and blaming it on you,",
- "If you can trust yourself when all men doubt you,",
- "But make allowance for their doubting too;"
- };
-
- public String[] getLines() {
- return stdLines;
- }
-
- /**
- * Test that directory watcher creates the correct output.
- * @throws Exception on failure
- */
- @Test
- public void testDirectoryWatcherOrder() throws Exception {
- Topology t = newTopology("testDirectoryWatcherOrder");
-
- runDirectoryWatcher(t, 20, 1);
- }
-
- @Test
- public void testDirectoryWatcherOrderWithDelete() throws Exception {
- Topology t = newTopology("testDirectoryWatcherOrderWithDelete");
-
- runDirectoryWatcher(t, 20, 3);
- }
-
- @Test
- public void testDirectoryWatcherPreExisting() throws Exception {
- Topology t = newTopology("testDirectoryWatcherPreExisting");
-
- runDirectoryWatcher(t, 20, -1);
- }
-
- private void runDirectoryWatcher(Topology t, int numberOfFiles, int repeat) throws Exception {
-
- boolean preExistingMode = repeat < 0;
- repeat = Math.abs(repeat);
-
- System.out.println("##### "+t.getName());
- final Path dir = Files.createTempDirectory("testdw");
- final String[] files = new String[numberOfFiles];
- for (int i = 0; i < files.length; i++) {
- files[i] = dir.resolve("A" + (numberOfFiles - i)).toAbsolutePath()
- .toString();
- }
- List<String> expectedFileNames = new ArrayList<>();
- for (int r = 0; r < repeat; r++)
- expectedFileNames.addAll(Arrays.asList(files));
-
- if (preExistingMode) {
- // exercise the case where files exist when the watcher starts
- // also test that files starting with "." (hiddden files)
- // are ignored. Add the file here but not to the expected list.
- String[] filesWithHidden = Arrays.copyOf(files, files.length+1);
- File f = new File(files[0]);
- File hidden = new File(f.getParent(), f.getName().replaceFirst("^", "."));
- filesWithHidden[files.length] = hidden.toString();
- createFiles(filesWithHidden, repeat);
- }
- else {
- // Create the files from within the topology.
- //
- // Due to vagaries / delays that can occur in operator startup,
- // delay the initial file creation to give the watcher a chance to startup.
- //
- // e.g., with numberOfFiles=20 & repeat=1, each group of files
- // only lasts 20*(10ms*2) => 200ms. That can easily happen before
- // the watcher is started and has done its first dir.listFiles(),
- // with the result being not seeing/processing the expected number
- // of files.
-
- if (repeat > 1) {
- if ("Mac OS X".equals(System.getProperty("os.name"))) {
- // This test does delete/recreate too fast for this platform's
- // WatchService. See comments in FileStreams.directoryWatcher()
- // and in DirectoryWatcher.
- System.err.println("Test "+t.getName()+": sigh not on MacOS");
- assumeTrue(false);
- }
- }
-
- int finalRepeat = repeat;
- PlumbingStreams.blockingOneShotDelay(
- t.collection(Arrays.asList(0L)), 3, TimeUnit.SECONDS)
- .sink((beacon) -> createFiles(files, finalRepeat));
- }
-
- TStream<String> fileNames = FileStreams.directoryWatcher(t,
- () -> dir.toAbsolutePath().toString());
-
- try {
- // These tests require unordered validation because the
- // files are created only 10msec apart and the filesystem
- // and/or event system may not preserve the actual ordering
- // at that resolution.
-
- fileNames.sink(str -> System.out.println("got file "+str));
-
- completeAndValidate(false/*ordered*/, "", t, fileNames, 20,
- expectedFileNames.toArray(new String[0]));
- }
- finally {
- deleteFilesAndDir(dir, files);
- }
- }
-
- private void deleteFilesAndDir(final Path dir, final String[] files) {
- // Ensure we clean up!
- for (int i = 0; i < files.length; i++) {
- Path path = Paths.get(files[i]);
- path.toFile().delete();
- }
- dir.toFile().delete();
- }
-
- private void createFiles(String[] files, int repeat) {
- try {
- for (int r = 0; r < repeat; r++) {
- for (int i = 0; i < files.length; i++) {
- Path path = Paths.get(files[i]);
- if (r > 0) {
- path.toFile().delete();
- Thread.sleep(10);
- // System.out.println(new Date() + " deleted " + path.getFileName());
- }
- Files.createFile(path);
- Thread.sleep(10);
- // System.out.println(new Date() + " created " + path.getFileName());
- }
- }
- } catch (InterruptedException e) {
- // shutdown
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testTextFileReader() throws Exception {
- Topology t = newTopology("testTextFileReader");
-
- String[] lines = getLines();
- String[] ucLines = Stream.of(lines)
- .map(line -> line.toUpperCase())
- .toArray(String[]::new);
- String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
- .toArray(String[]::new);
-
- Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
- Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
-
- TStream<String> contents = FileStreams.textFileReader(
- t.strings(tempFile1.toAbsolutePath().toString(),
- tempFile2.toAbsolutePath().toString()));
-
- try {
- completeAndValidate("", t, contents, 10, allLines);
- }
- finally {
- tempFile1.toFile().delete();
- tempFile2.toFile().delete();
- }
- }
-
- @Test
- public void testTextFileReaderProblemPaths() throws Exception {
- Topology t = newTopology("testTextFileReaderProblemPaths");
-
- String[] lines = getLines();
- String[] ucLines = Stream.of(lines)
- .map(line -> line.toUpperCase())
- .toArray(String[]::new);
- String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
- .toArray(String[]::new);
-
- Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
- Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
-
- // ensure a problem in one file (tuple) doesn't affect others.
- // The problem files should result in a log entry but otherwise be ignored.
-
- TStream<String> contents = FileStreams.textFileReader(
- t.strings(tempFile1.toAbsolutePath().toString(),
- "/no-such-file",
- "/tmp",
- tempFile2.toAbsolutePath().toString()));
-
- try {
- completeAndValidate("", t, contents, 10, allLines);
- }
- finally {
- tempFile1.toFile().delete();
- tempFile2.toFile().delete();
- }
- }
-
- @Test
- public void testTextFileReaderPrePost() throws Exception {
- Topology t = newTopology("testTextFileReaderPrePost");
-
- String[] lines = getLines();
- String[] ucLines = Stream.of(lines)
- .map(line -> line.toUpperCase())
- .toArray(String[]::new);
-
- Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
- Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
-
- // Be insensitive to Windows path separators and "/tmp" location
- boolean isWindows = System.getProperty("os.name").startsWith("Windows");
- File tmpDir = File.createTempFile("anything", "anything");
- tmpDir.delete();
- tmpDir = tmpDir.getParentFile();
-
- Function<String,String> preFn
- = path -> String.format("[PRE-FUNCTION] path:%s", path);
- BiFunction<String,Exception,String> postFn
- = (path,exc) -> String.format("[POST-FUNCTION] path:%s exc=%s",
- path, Objects.toString(exc));
-
-
- List<String> allLines = new ArrayList<>();
- allLines.add(preFn.apply(tempFile1.toAbsolutePath().toString()));
- allLines.addAll(Arrays.asList(lines));
- allLines.add(postFn.apply(tempFile1.toAbsolutePath().toString(), null));
- //
- String noSuchFilePath = new File(tmpDir, "no-such-file").toString();
- allLines.add(preFn.apply(noSuchFilePath));
- allLines.add(postFn.apply(noSuchFilePath, new NoSuchFileException(noSuchFilePath)));
- //
- String tmpDirPath = tmpDir.toString();
- allLines.add(preFn.apply(tmpDirPath));
- allLines.add(postFn.apply(tmpDirPath,
- isWindows
- ? new AccessDeniedException(tmpDirPath)
- : new IOException("Is a directory")));
- //
- allLines.add(preFn.apply(tempFile2.toAbsolutePath().toString()));
- allLines.addAll(Arrays.asList(ucLines));
- allLines.add(postFn.apply(tempFile2.toAbsolutePath().toString(), null));
-
- TStream<String> contents = FileStreams.textFileReader(
- t.strings(tempFile1.toAbsolutePath().toString(),
- noSuchFilePath,
- tmpDirPath,
- tempFile2.toAbsolutePath().toString()),
- preFn, postFn
- );
-
- try {
- completeAndValidate("", t, contents, 10, allLines.toArray(new String[0]));
- }
- finally {
- tempFile1.toFile().delete();
- tempFile2.toFile().delete();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java b/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
deleted file mode 100644
index e1e5eed..0000000
--- a/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.file;
-
-/**
- * FileStreamsTextFileWriter connector globalization tests.
- */
-public class FileStreamsTextFileWriterGlobalTest extends FileStreamsTextFileWriterTest {
-
- private static final String globalStr = "\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d";
- private static final String[] globalLines = new String[] {
- "1-"+globalStr,
- "2-"+globalStr,
- "3-"+globalStr,
- "4-"+globalStr
- };
-
- public String getStr() {
- return globalStr;
- }
-
- public String[] getLines() {
- return globalLines;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java b/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java
deleted file mode 100644
index 8cef9be..0000000
--- a/connectors/file/src/test/java/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java
+++ /dev/null
@@ -1,945 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.file;
-
-import static edgent.test.connectors.common.FileUtil.createTempFile;
-
-//import static org.junit.Assume.assumeFalse;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import org.junit.Test;
-
-import edgent.connectors.file.CompressedFileWriterPolicy;
-import edgent.connectors.file.FileStreams;
-import edgent.connectors.file.FileWriterCycleConfig;
-import edgent.connectors.file.FileWriterFlushConfig;
-import edgent.connectors.file.FileWriterPolicy;
-import edgent.connectors.file.FileWriterRetentionConfig;
-import edgent.connectors.file.runtime.IFileWriterPolicy;
-import edgent.function.Predicate;
-import edgent.test.providers.direct.DirectTestSetup;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.topology.tester.Condition;
-
-public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implements DirectTestSetup {
-
- String str = "123456789";
- String[] stdLines = new String[] {
- "1-"+str,
- "2-"+str,
- "3-"+str,
- "4-"+str
- };
-
- private int TMO_SEC = 2;
-
- public String getStr() {
- return str;
- }
-
- public String[] getLines() {
- return stdLines;
- }
-
- @Test
- public void testFlushConfig() throws Exception {
- FileWriterFlushConfig<String> cfg;
-
- String trueTuple = "true";
- String falseTuple = "false";
- Predicate<String> p = tuple -> tuple.equals("true");
-
- cfg = FileWriterFlushConfig.newImplicitConfig();
- checkFileWriterConfig(cfg, 0, 0, null, trueTuple, falseTuple);
-
- cfg = FileWriterFlushConfig.newCountBasedConfig(3);
- checkFileWriterConfig(cfg, 3, 0, null, trueTuple, falseTuple);
- expectIAE(() -> FileWriterFlushConfig.newCountBasedConfig(0));
-
- cfg = FileWriterFlushConfig.newTimeBasedConfig(10);
- checkFileWriterConfig(cfg, 0, 10, null, trueTuple, falseTuple);
- expectIAE(() -> FileWriterFlushConfig.newTimeBasedConfig(0));
-
- cfg = FileWriterFlushConfig.newPredicateBasedConfig(p);
- checkFileWriterConfig(cfg, 0, 0, p, trueTuple, falseTuple);
- expectIAE(() -> FileWriterFlushConfig.newPredicateBasedConfig(null));
-
- cfg = FileWriterFlushConfig.newConfig(1, 2, p);
- checkFileWriterConfig(cfg, 1, 2, p, trueTuple, falseTuple);
- cfg = FileWriterFlushConfig.newConfig(0, 0, null);
- checkFileWriterConfig(cfg, 0, 0, null, trueTuple, falseTuple);
- expectIAE(() -> FileWriterFlushConfig.newConfig(-1, 0, null));
- expectIAE(() -> FileWriterFlushConfig.newConfig(0, -1, null));
- }
-
- private static <T> void checkFileWriterConfig(FileWriterFlushConfig<T> cfg,
- int cntTuples, long periodMsec, Predicate<T> tuplePredicate,
- T trueTuple, T falseTuple) {
- assertEquals(cntTuples, cfg.getCntTuples());
- assertEquals(periodMsec, cfg.getPeriodMsec());
- assertEquals(tuplePredicate, cfg.getTuplePredicate());
- cfg.toString();
-
- int falseNTuples = cntTuples==1 ? 0 : cntTuples+1;
- int trueNTuples = 3*cntTuples;
-
- assertFalse("cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(falseNTuples, falseTuple));
- if (cntTuples!=0)
- assertTrue("cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(trueNTuples, falseTuple));
- if (tuplePredicate!=null)
- assertTrue("cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(falseNTuples, trueTuple));
- }
-
- @Test
- public void testCycleConfig() throws Exception {
- FileWriterCycleConfig<String> cfg;
-
- String trueTuple = "true";
- String falseTuple = "false";
- Predicate<String> p = tuple -> tuple.equals("true");
-
- cfg = FileWriterCycleConfig.newFileSizeBasedConfig(2);
- checkFileWriterConfig(cfg, 2, 0, 0, null, trueTuple, falseTuple);
- expectIAE(() -> FileWriterCycleConfig.newFileSizeBasedConfig(0));
-
- cfg = FileWriterCycleConfig.newCountBasedConfig(3);
- checkFileWriterConfig(cfg, 0, 3, 0, null, trueTuple, falseTuple);
- expectIAE(() -> FileWriterCycleConfig.newCountBasedConfig(0));
-
- cfg = FileWriterCycleConfig.newTimeBasedConfig(10);
- checkFileWriterConfig(cfg, 0, 0, 10, null, trueTuple, falseTuple);
- expectIAE(() -> FileWriterCycleConfig.newTimeBasedConfig(0));
-
- cfg = FileWriterCycleConfig.newPredicateBasedConfig(p);
- checkFileWriterConfig(cfg, 0, 0, 0, p, trueTuple, falseTuple);
- expectIAE(() -> FileWriterCycleConfig.newPredicateBasedConfig(null));
-
- cfg = FileWriterCycleConfig.newConfig(1, 2, 3, p);
- checkFileWriterConfig(cfg, 1, 2, 3, p, trueTuple, falseTuple);
- expectIAE(() -> FileWriterCycleConfig.newConfig(0, 0, 0, null));
- expectIAE(() -> FileWriterCycleConfig.newConfig(-1, 0, 0, null));
- expectIAE(() -> FileWriterCycleConfig.newConfig(0, -1, 0, null));
- expectIAE(() -> FileWriterCycleConfig.newConfig(0, 0, -1, null));
- }
-
- private static <T> void checkFileWriterConfig(FileWriterCycleConfig<T> cfg,
- long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate,
- T trueTuple, T falseTuple) {
- assertEquals(fileSize, cfg.getFileSize());
- assertEquals(cntTuples, cfg.getCntTuples());
- assertEquals(periodMsec, cfg.getPeriodMsec());
- assertEquals(tuplePredicate, cfg.getTuplePredicate());
- cfg.toString();
-
- long falseFileSize = fileSize-1;
- long trueFileSize = fileSize+1;
- int falseNTuples = cntTuples==1 ? 0 : cntTuples+1;
- int trueNTuples = 3*cntTuples;
-
- assertFalse("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(falseFileSize, falseNTuples, falseTuple));
- if (fileSize!=0)
- assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(trueFileSize, trueNTuples, falseTuple));
- if (cntTuples!=0)
- assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(falseFileSize, trueNTuples, falseTuple));
- if (tuplePredicate!=null)
- assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
- cfg.evaluate(falseFileSize, falseNTuples, trueTuple));
- }
-
- @Test
- public void testRetentionConfig() throws Exception {
- FileWriterRetentionConfig cfg;
-
- cfg = FileWriterRetentionConfig.newFileCountBasedConfig(2);
- checkFileWriterConfig(cfg, 2, 0, 0, 0);
- expectIAE(() -> FileWriterRetentionConfig.newFileCountBasedConfig(0));
-
- cfg = FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(3);
- checkFileWriterConfig(cfg, 0, 3, 0, 0);
- expectIAE(() -> FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(0));
-
- cfg = FileWriterRetentionConfig.newAgeBasedConfig(10,11);
- checkFileWriterConfig(cfg, 0, 0, 10, 11);
- expectIAE(() -> FileWriterRetentionConfig.newAgeBasedConfig(0,1));
- expectIAE(() -> FileWriterRetentionConfig.newAgeBasedConfig(1,0));
-
- cfg = FileWriterRetentionConfig.newConfig(1, 2, 3, 0);
- checkFileWriterConfig(cfg, 1, 2, 3, 0);
- expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, 0));
- expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 1, 0));
- expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, 1));
- expectIAE(() -> FileWriterRetentionConfig.newConfig(-1, 0, 0, 0));
- expectIAE(() -> FileWriterRetentionConfig.newConfig(0, -1, 0, 0));
- expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, -1, 0));
- expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, -1));
- }
-
- private void expectIAE(Runnable fn) {
- try {
- fn.run();
- fail("expected IAE");
- } catch (IllegalArgumentException e) { /* expected */ }
- }
-
- private static <T> void checkFileWriterConfig(FileWriterRetentionConfig cfg,
- int fileCnt, long aggSize, long ageSec, long periodMsec) {
- assertEquals(fileCnt, cfg.getFileCount());
- assertEquals(aggSize, cfg.getAggregateFileSize());
- assertEquals(ageSec, cfg.getAgeSec());
- assertEquals(periodMsec, cfg.getPeriodMsec());
- cfg.toString();
-
- int falseFileCnt = fileCnt-1;
- int trueFileCnt = fileCnt+1;
- long falseAggSize = aggSize-1;
- long trueAggSize = aggSize+1;
-
- assertFalse("fileCnt:"+fileCnt+" aggSize:"+aggSize,
- cfg.evaluate(falseFileCnt, falseAggSize));
- if (fileCnt!=0)
- assertTrue("fileCnt:"+fileCnt+" aggSize:"+aggSize,
- cfg.evaluate(trueFileCnt, falseAggSize));
- if (aggSize!=0)
- assertTrue("fileCnt:"+fileCnt+" aggSize:"+aggSize,
- cfg.evaluate(falseFileCnt, trueAggSize));
- }
-
- @Test
- public void testDefaultConfig() throws Exception {
- FileWriterPolicy<String> policy = new FileWriterPolicy<>();
- checkFileWriterConfig(policy.getFlushConfig(), 0, TimeUnit.SECONDS.toMillis(10), null, null, null);
- checkFileWriterConfig(policy.getCycleConfig(), 1*1024*1024, 0, 0, null, null, null);
- checkFileWriterConfig(policy.getRetentionConfig(), 10, 0, 0, 0);
- policy.toString();
- policy.close();
- }
-
- @Test
- public void testNoFilesCreated() throws Exception {
- // complete before any files are generated
- Topology t = newTopology("testNoFilesCreated");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- // build expected results
- List<List<String>> expResults = Collections.emptyList();
-
- TStream<String> s = t.events(eventSetup -> { /* no tuples generated */ });
-
- FileStreams.textFileWriter(s, () -> basePath.toString());
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testOneFileCreated() throws Exception {
- // all lines into a single (the first) file
- Topology t = newTopology("testOneFileCreated");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net all in one, the first, file
- List<List<String>> expResults = buildExpResults(lines, tuple -> false);
- assertEquals(1, expResults.size());
-
- TStream<String> s = t.strings(lines);
-
- // default writer policy
- TSink<String> sink = FileStreams.textFileWriter(s, () -> basePath.toString());
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
-
- assertNotNull(sink);
- }
-
- @Test
- public void testManyFiles() throws Exception {
- Topology t = newTopology("testManyFiles");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net one tuples per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
-
- // in this config files are create very fast hence they end
- // up exercising the _<n> suffix to basePath_YYYYMMDD_HHMMSS
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(), // no extra flush
- FileWriterCycleConfig.newCountBasedConfig(1), // yield one line per file
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testManyFilesSlow() throws Exception {
- Topology t = newTopology("testManyFilesSlow");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net one tuples per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
-
- // add delay so we get different files w/o a _<n> suffix
-
- int throttleSec = 2;
- TStream<String> s = PlumbingStreams.blockingThrottle(
- t.strings(lines), throttleSec, TimeUnit.SECONDS);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(), // no extra flush
- FileWriterCycleConfig.newCountBasedConfig(1), // yield one line per file
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC,
- basePath, expResults);
- }
-
- @Test
- public void testRetainCntBased() throws Exception {
- // more lines than configured retained numFiles; only keep the last numFiles
- Topology t = newTopology("testRetainCntBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net one tuples per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
- int keepCnt = 2; // only keep the last n files
- for (int i = 0; i < keepCnt; i++)
- expResults.remove(0);
- assertEquals(keepCnt, expResults.size());
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(1),
- FileWriterRetentionConfig.newFileCountBasedConfig(keepCnt)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testRetainAggSizeBased() throws Exception {
- // more aggsize than configured; only keep aggsize worth
- Topology t = newTopology("testRetainAggSizeBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net one tuple per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
- // agg size only enough for last two lines
- long aggregateFileSize = 2 * (("1-"+getStr()).getBytes().length + 1/*eol*/);
- expResults.remove(0);
- expResults.remove(0);
- assertEquals(2, expResults.size());
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(1),
- FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(aggregateFileSize)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testRetainAgeBased() throws Exception {
- Topology t = newTopology("testRetainAgeBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- int keepCnt = 2; // only keep the last n files with throttling, age,
- // and TMO_SEC
- int ageSec = 5;
- long periodMsec = TimeUnit.SECONDS.toMillis(1);
- // net one tuple per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
- for (int i = 0; i < keepCnt; i++)
- expResults.remove(0);
- assertEquals(keepCnt, expResults.size());
-
- // add delay so we can age out things
- //
- // After several runs this test seems reliable but
- // I suspect it may be fragile wrt timing hence the results.
- //
- // With 4 tuples, throttleDelay=2sec, and ageSec=5
- // t0=add-f1, t1, t2=add-f2, t3, t4=add-f3, t5-rm-f1, t6=add-f4, t7=rm-f2, t8, t9=rm-f3, ...
- //
- // So we want to check somewhere around t8 (after t7 and definitely before t9)
- // so all 4 files were created and the first 2 have been aged out.
- // with complete delay = #files-1*throttle + TMO_SEC, should be 6+2 == t8.
-
- int throttleSec = 2;
- TStream<String> s = PlumbingStreams.blockingThrottle(
- t.strings(lines), throttleSec, TimeUnit.SECONDS);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(1),
- FileWriterRetentionConfig.newAgeBasedConfig(ageSec, periodMsec)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, ((lines.length-1)*throttleSec)+TMO_SEC,
- basePath, expResults);
- }
-
- @Test
- public void testFlushImplicit() throws Exception {
- Topology t = newTopology("testFlushImplicit");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net all in one, the first, file
- List<List<String>> expResults = buildExpResults(lines, tuple -> false);
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(1000),
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testFlushCntBased() throws Exception {
- Topology t = newTopology("testFlushCntBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net all in one, the first, file
- List<List<String>> expResults = buildExpResults(lines, tuple -> false);
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newCountBasedConfig(1), // every tuple
- FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testFlushTimeBased() throws Exception {
- Topology t = newTopology("testFlushTimeBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net all in one, the first, file
- List<List<String>> expResults = buildExpResults(lines, tuple -> false);
-
- // add delay so time flush happens
-
- int throttleSec = 1;
- TStream<String> s = PlumbingStreams.blockingThrottle(
- t.strings(lines), throttleSec, TimeUnit.SECONDS);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)),
- FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC,
- basePath, expResults);
- }
-
- @Test
- public void testFlushTupleBased() throws Exception {
- Topology t = newTopology("testFlushTupleBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net all in one, the first, file
- List<List<String>> expResults = buildExpResults(lines, tuple -> false);
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newPredicateBasedConfig(
- tuple -> tuple.startsWith("1-") || tuple.startsWith("3-")),
- FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testCycleCntBased() throws Exception {
- Topology t = newTopology("testCycleCntBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net two tuples per file
- int cntTuples = 2;
- AtomicInteger cnt = new AtomicInteger();
- Predicate<String> cycleIt = tuple -> cnt.incrementAndGet() % cntTuples == 0;
- List<List<String>> expResults = buildExpResults(lines, cycleIt);
- assertEquals(lines.length / cntTuples, expResults.size());
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(cntTuples),
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testCycleSizeBased() throws Exception {
- Topology t = newTopology("testCycleSizeBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net one tuple per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
- int fileSize = 2;
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newFileSizeBasedConfig(fileSize),
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testCycleTimeBased() throws Exception {
- Topology t = newTopology("testCycleTimeBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net one tuple per file with 250msec cycle config and 1 throttle
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
-
- // add delay so time cycle happens
- // also verifies only cycle if there's something to cycle
- // (i.e., these cycles happen faster than tuples are written)
-
- int throttleSec = 1;
- TStream<String> s = PlumbingStreams.blockingThrottle(
- t.strings(lines), throttleSec, TimeUnit.SECONDS);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)),
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC,
- basePath, expResults);
- }
-
- @Test
- public void testCycleTupleBased() throws Exception {
- Topology t = newTopology("testCycleTupleBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // a tuple based config / predicate. in this case should end up with 3 files.
- Predicate<String> cycleIt = tuple -> tuple.startsWith("1-") || tuple.startsWith("3-");
- List<List<String>> expResults = buildExpResults(lines, cycleIt);
- assertEquals(3, expResults.size());
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newPredicateBasedConfig(cycleIt),
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testAllTimeBased() throws Exception {
- // exercise case with multiple timer based policies
- Topology t = newTopology("testAllTimeBased");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // keep all given age and TMO_SEC
- int ageSec = 10;
- long periodMsec = TimeUnit.SECONDS.toMillis(1);
- // net one tuple per file
- List<List<String>> expResults = buildExpResults(lines, tuple -> true);
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)),
- FileWriterCycleConfig.newConfig(1, 2000, TimeUnit.SECONDS.toMillis(10), null),
- FileWriterRetentionConfig.newAgeBasedConfig(ageSec, periodMsec)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- @Test
- public void testWriterWatcherReader() throws Exception {
- // verify all the pieces work together
- Topology t = newTopology("testWriterWatcherReader");
-
- String testDirPrefix = "testWriterWatcherReader";
- Path dir = Files.createTempDirectory(testDirPrefix);
- Path basePath = dir.resolve("writerCreated");
-
- String[] lines = getLines();
-
- System.out.println("########## "+t.getName());
-
- // Write the files
- // add delay so watcher starts first
- int throttleSec = 2;
- TStream<String> contents = PlumbingStreams.blockingThrottle(
- t.strings(lines), throttleSec, TimeUnit.SECONDS);
-
- IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(1), // one per file
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(contents, () -> basePath.toString(), () -> policy);
-
- // Watch and read contents
- TStream<String> pathnames = FileStreams.directoryWatcher(t,
- () -> dir.toAbsolutePath().toString());
- pathnames.sink(tuple -> System.out.println("watcher added "+tuple));
- pathnames.peek(tuple -> { if (new File(tuple).getName().startsWith("."))
- throw new RuntimeException("Not filtering active/hidden files "+tuple); });
- TStream<String> readContents = FileStreams.textFileReader(pathnames);
-
- boolean dump = true;
- try {
- completeAndValidate("", t, readContents,
- (lines.length*throttleSec)+TMO_SEC, lines);
- dump = false;
- }
- finally {
- deleteDirAndFiles(dir, testDirPrefix, dump);
- }
- }
-
- @Test
- public void testCompressedFileWriterPolicy() throws Exception {
- Topology t = newTopology("testCompressedFileWriterPolicy");
-
- // establish a base path
- Path basePath = createTempFile("test1", "txt", new String[0]);
-
- String[] lines = getLines();
-
- // build expected results
- // net 2 tuples per file
- int cntTuples = 2;
- AtomicInteger cnt = new AtomicInteger();
- Predicate<String> cycleIt = tuple -> cnt.incrementAndGet() % cntTuples == 0;
- List<List<String>> expResults = buildExpResults(lines, cycleIt);
- assertEquals(lines.length / cntTuples, expResults.size());
-
- TStream<String> s = t.strings(lines);
-
- IFileWriterPolicy<String> policy = new CompressedFileWriterPolicy<String>(
- FileWriterFlushConfig.newImplicitConfig(),
- FileWriterCycleConfig.newCountBasedConfig(cntTuples),
- FileWriterRetentionConfig.newFileCountBasedConfig(10)
- );
- FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
-
- completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
- }
-
- private void deleteDirAndFiles(Path dir, String dirPrefix, boolean dump) {
- // exercise caution before removing all files in dir
- if (!dirPrefix.startsWith("test"))
- throw new IllegalStateException("Yikes. dir:"+dir+" dirPrefix:"+dirPrefix);
- String leaf = dir.getFileName().toString();
- if (!leaf.startsWith(dirPrefix))
- throw new IllegalStateException("Yikes. dir:"+dir+" dirPrefix:"+dirPrefix);
-
- // Ok, delete all the files in the dir and then the dir
- for (File file : dir.toFile().listFiles()) {
- if (dump)
- dumpFile(file);
- file.delete();
- }
- dir.toFile().delete();
- }
-
- private void dumpFile(File f) {
- System.out.println("<<<<< Dumping "+f);
- try {
- Path path = f.toPath();
- try (BufferedReader br = Files.newBufferedReader(path)) {
- br.lines().forEach(line -> System.out.println(line));
- }
- }
- catch (Exception e) {
- System.out.println("##### exception: " + e.getLocalizedMessage());
- }
- System.out.println(">>>>> DONE "+f);
- }
-
- private <T> List<List<T>> buildExpResults(T[] tuples, Predicate<T> cycleIt) {
- List<List<T>> expResults = new ArrayList<>();
- List<T> oneFile = null;
- for (T tuple : tuples) {
- if (oneFile==null) {
- oneFile = new ArrayList<>();
- expResults.add(oneFile);
- }
- oneFile.add(tuple);
- if (cycleIt.test(tuple))
- oneFile = null;
- }
- return expResults;
- }
-
- private <T> void completeAndValidateWriter(Topology t, int tmoSec,
- Path basePath, List<List<T>> expResults) throws Exception {
-
- try {
- // just let it run to the tmo before we check the file contents
- Condition<Object> tc = new Condition<Object>() {
- public boolean valid() { return false; }
- public Object getResult() { return null; }
- };
-
- complete(t, tc, tmoSec, TimeUnit.SECONDS);
-
- System.out.println("########## "+t.getName());
-
- // right number of files?
- List<Path> actFiles = getActFiles(basePath);
- System.out.println("actFiles: "+actFiles);
- assertEquals(actFiles.toString(), expResults.size(), actFiles.size());
-
- // do the file(s) have the right contents?
- System.out.println("expResults: "+expResults);
- int i = 0;
- for (List<T> expFile : expResults) {
- Path path = actFiles.get(i++);
- checkContents(path, expFile.toArray(new String[0]));
- }
- }
- finally {
- deleteAll(basePath);
- }
- }
-
- private void deleteAll(Path basePath) {
- Path parent = basePath.getParent();
- String baseLeaf = basePath.getFileName().toString();
- String[] actLeafs = parent.toFile().list(
- (dir,leaf) -> leaf.startsWith(baseLeaf));
- for (String leaf : actLeafs) {
- parent.resolve(leaf).toFile().delete();
- }
- }
-
- private List<Path> getActFiles(Path basePath) {
- List<Path> paths = new ArrayList<>();
- Path parent = basePath.getParent();
- String baseLeaf = basePath.getFileName().toString();
- String[] actLeafs = parent.toFile().list(
- (dir,leaf) -> leaf.startsWith(baseLeaf+"_"));
- Arrays.sort(actLeafs, (o1,o2) -> o1.compareTo(o2));
- for (String leaf : actLeafs) {
- paths.add(parent.resolve(leaf));
- }
- return paths;
- }
-
- private void checkContents(Path path, String[] lines) {
- if (path.getFileName().toString().endsWith(".zip")) {
- checkZipContents(path, lines);
- return;
- }
- System.out.println("checking file "+path);
- int lineCnt = 0;
- try (BufferedReader br = Files.newBufferedReader(path)) {
- for (String line : lines) {
- ++lineCnt;
- String actLine = br.readLine();
- assertEquals("path:"+path+" line "+lineCnt, line, actLine);
- }
- assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine());
- }
- catch (IOException e) {
- assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e);
- }
- }
-
- private void checkZipContents(Path path, String[] lines) {
- System.out.println("checking file "+path);
- String fileName = path.getFileName().toString();
- String entryName = fileName.substring(0, fileName.length() - ".zip".length());
- int lineCnt = 0;
- try (
- FileInputStream fis = new FileInputStream(path.toFile());
- ZipInputStream zin = new ZipInputStream((new BufferedInputStream(fis)));
- )
- {
- ZipEntry entry = zin.getNextEntry();
-
- assertEquals(entryName, entry.getName());
-
- BufferedReader br = new BufferedReader(new InputStreamReader(zin));
- for (String line : lines) {
- ++lineCnt;
- String actLine = br.readLine();
- assertEquals("path:"+path+" line "+lineCnt, line, actLine);
- }
- assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine());
- }
- catch (IOException e) {
- assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsGlobalTest.java b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsGlobalTest.java
new file mode 100644
index 0000000..dbadb16
--- /dev/null
+++ b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsGlobalTest.java
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.file;
+
+/**
+ * FileStreams connector globalization tests.
+ */
+public class FileStreamsGlobalTest extends FileStreamsTest {
+
+ private static final String[] globalLines = new String[] {
+ "\u5b78\u800c\u6642\u7fd2\u4e4b",
+ "\u4e0d\u4ea6\u8aaa\u4e4e"
+ };
+
+ public String[] getLines() {
+ return globalLines;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTest.java b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTest.java
new file mode 100644
index 0000000..0ea1fa6
--- /dev/null
+++ b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTest.java
@@ -0,0 +1,310 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.file;
+
+import static org.junit.Assume.assumeTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.test.connectors.common.FileUtil;
+import org.apache.edgent.test.providers.direct.DirectTestSetup;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+public class FileStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
+
+ String[] stdLines = new String[] {
+ "If you can keep your head when all about you",
+ "Are losing theirs and blaming it on you,",
+ "If you can trust yourself when all men doubt you,",
+ "But make allowance for their doubting too;"
+ };
+
+ public String[] getLines() {
+ return stdLines;
+ }
+
+ /**
+ * Test that directory watcher creates the correct output.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testDirectoryWatcherOrder() throws Exception {
+ Topology t = newTopology("testDirectoryWatcherOrder");
+
+ runDirectoryWatcher(t, 20, 1);
+ }
+
+ @Test
+ public void testDirectoryWatcherOrderWithDelete() throws Exception {
+ Topology t = newTopology("testDirectoryWatcherOrderWithDelete");
+
+ runDirectoryWatcher(t, 20, 3);
+ }
+
+ @Test
+ public void testDirectoryWatcherPreExisting() throws Exception {
+ Topology t = newTopology("testDirectoryWatcherPreExisting");
+
+ runDirectoryWatcher(t, 20, -1);
+ }
+
+ private void runDirectoryWatcher(Topology t, int numberOfFiles, int repeat) throws Exception {
+
+ boolean preExistingMode = repeat < 0;
+ repeat = Math.abs(repeat);
+
+ System.out.println("##### "+t.getName());
+ final Path dir = Files.createTempDirectory("testdw");
+ final String[] files = new String[numberOfFiles];
+ for (int i = 0; i < files.length; i++) {
+ files[i] = dir.resolve("A" + (numberOfFiles - i)).toAbsolutePath()
+ .toString();
+ }
+ List<String> expectedFileNames = new ArrayList<>();
+ for (int r = 0; r < repeat; r++)
+ expectedFileNames.addAll(Arrays.asList(files));
+
+ if (preExistingMode) {
+ // exercise the case where files exist when the watcher starts
+ // also test that files starting with "." (hiddden files)
+ // are ignored. Add the file here but not to the expected list.
+ String[] filesWithHidden = Arrays.copyOf(files, files.length+1);
+ File f = new File(files[0]);
+ File hidden = new File(f.getParent(), f.getName().replaceFirst("^", "."));
+ filesWithHidden[files.length] = hidden.toString();
+ createFiles(filesWithHidden, repeat);
+ }
+ else {
+ // Create the files from within the topology.
+ //
+ // Due to vagaries / delays that can occur in operator startup,
+ // delay the initial file creation to give the watcher a chance to startup.
+ //
+ // e.g., with numberOfFiles=20 & repeat=1, each group of files
+ // only lasts 20*(10ms*2) => 200ms. That can easily happen before
+ // the watcher is started and has done its first dir.listFiles(),
+ // with the result being not seeing/processing the expected number
+ // of files.
+
+ if (repeat > 1) {
+ if ("Mac OS X".equals(System.getProperty("os.name"))) {
+ // This test does delete/recreate too fast for this platform's
+ // WatchService. See comments in FileStreams.directoryWatcher()
+ // and in DirectoryWatcher.
+ System.err.println("Test "+t.getName()+": sigh not on MacOS");
+ assumeTrue(false);
+ }
+ }
+
+ int finalRepeat = repeat;
+ PlumbingStreams.blockingOneShotDelay(
+ t.collection(Arrays.asList(0L)), 3, TimeUnit.SECONDS)
+ .sink((beacon) -> createFiles(files, finalRepeat));
+ }
+
+ TStream<String> fileNames = FileStreams.directoryWatcher(t,
+ () -> dir.toAbsolutePath().toString());
+
+ try {
+ // These tests require unordered validation because the
+ // files are created only 10msec apart and the filesystem
+ // and/or event system may not preserve the actual ordering
+ // at that resolution.
+
+ fileNames.sink(str -> System.out.println("got file "+str));
+
+ completeAndValidate(false/*ordered*/, "", t, fileNames, 20,
+ expectedFileNames.toArray(new String[0]));
+ }
+ finally {
+ deleteFilesAndDir(dir, files);
+ }
+ }
+
+ private void deleteFilesAndDir(final Path dir, final String[] files) {
+ // Ensure we clean up!
+ for (int i = 0; i < files.length; i++) {
+ Path path = Paths.get(files[i]);
+ path.toFile().delete();
+ }
+ dir.toFile().delete();
+ }
+
+ private void createFiles(String[] files, int repeat) {
+ try {
+ for (int r = 0; r < repeat; r++) {
+ for (int i = 0; i < files.length; i++) {
+ Path path = Paths.get(files[i]);
+ if (r > 0) {
+ path.toFile().delete();
+ Thread.sleep(10);
+ // System.out.println(new Date() + " deleted " + path.getFileName());
+ }
+ Files.createFile(path);
+ Thread.sleep(10);
+ // System.out.println(new Date() + " created " + path.getFileName());
+ }
+ }
+ } catch (InterruptedException e) {
+ // shutdown
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testTextFileReader() throws Exception {
+ Topology t = newTopology("testTextFileReader");
+
+ String[] lines = getLines();
+ String[] ucLines = Stream.of(lines)
+ .map(line -> line.toUpperCase())
+ .toArray(String[]::new);
+ String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
+ .toArray(String[]::new);
+
+ Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+ Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
+
+ TStream<String> contents = FileStreams.textFileReader(
+ t.strings(tempFile1.toAbsolutePath().toString(),
+ tempFile2.toAbsolutePath().toString()));
+
+ try {
+ completeAndValidate("", t, contents, 10, allLines);
+ }
+ finally {
+ tempFile1.toFile().delete();
+ tempFile2.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testTextFileReaderProblemPaths() throws Exception {
+ Topology t = newTopology("testTextFileReaderProblemPaths");
+
+ String[] lines = getLines();
+ String[] ucLines = Stream.of(lines)
+ .map(line -> line.toUpperCase())
+ .toArray(String[]::new);
+ String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
+ .toArray(String[]::new);
+
+ Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+ Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
+
+ // ensure a problem in one file (tuple) doesn't affect others.
+ // The problem files should result in a log entry but otherwise be ignored.
+
+ TStream<String> contents = FileStreams.textFileReader(
+ t.strings(tempFile1.toAbsolutePath().toString(),
+ "/no-such-file",
+ "/tmp",
+ tempFile2.toAbsolutePath().toString()));
+
+ try {
+ completeAndValidate("", t, contents, 10, allLines);
+ }
+ finally {
+ tempFile1.toFile().delete();
+ tempFile2.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testTextFileReaderPrePost() throws Exception {
+ Topology t = newTopology("testTextFileReaderPrePost");
+
+ String[] lines = getLines();
+ String[] ucLines = Stream.of(lines)
+ .map(line -> line.toUpperCase())
+ .toArray(String[]::new);
+
+ Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+ Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
+
+ // Be insensitive to Windows path separators and "/tmp" location
+ boolean isWindows = System.getProperty("os.name").startsWith("Windows");
+ File tmpDir = File.createTempFile("anything", "anything");
+ tmpDir.delete();
+ tmpDir = tmpDir.getParentFile();
+
+ Function<String,String> preFn
+ = path -> String.format("[PRE-FUNCTION] path:%s", path);
+ BiFunction<String,Exception,String> postFn
+ = (path,exc) -> String.format("[POST-FUNCTION] path:%s exc=%s",
+ path, Objects.toString(exc));
+
+
+ List<String> allLines = new ArrayList<>();
+ allLines.add(preFn.apply(tempFile1.toAbsolutePath().toString()));
+ allLines.addAll(Arrays.asList(lines));
+ allLines.add(postFn.apply(tempFile1.toAbsolutePath().toString(), null));
+ //
+ String noSuchFilePath = new File(tmpDir, "no-such-file").toString();
+ allLines.add(preFn.apply(noSuchFilePath));
+ allLines.add(postFn.apply(noSuchFilePath, new NoSuchFileException(noSuchFilePath)));
+ //
+ String tmpDirPath = tmpDir.toString();
+ allLines.add(preFn.apply(tmpDirPath));
+ allLines.add(postFn.apply(tmpDirPath,
+ isWindows
+ ? new AccessDeniedException(tmpDirPath)
+ : new IOException("Is a directory")));
+ //
+ allLines.add(preFn.apply(tempFile2.toAbsolutePath().toString()));
+ allLines.addAll(Arrays.asList(ucLines));
+ allLines.add(postFn.apply(tempFile2.toAbsolutePath().toString(), null));
+
+ TStream<String> contents = FileStreams.textFileReader(
+ t.strings(tempFile1.toAbsolutePath().toString(),
+ noSuchFilePath,
+ tmpDirPath,
+ tempFile2.toAbsolutePath().toString()),
+ preFn, postFn
+ );
+
+ try {
+ completeAndValidate("", t, contents, 10, allLines.toArray(new String[0]));
+ }
+ finally {
+ tempFile1.toFile().delete();
+ tempFile2.toFile().delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
new file mode 100644
index 0000000..de31189
--- /dev/null
+++ b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.file;
+
+/**
+ * FileStreamsTextFileWriter connector globalization tests.
+ */
+public class FileStreamsTextFileWriterGlobalTest extends FileStreamsTextFileWriterTest {
+
+ private static final String globalStr = "\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d";
+ private static final String[] globalLines = new String[] {
+ "1-"+globalStr,
+ "2-"+globalStr,
+ "3-"+globalStr,
+ "4-"+globalStr
+ };
+
+ public String getStr() {
+ return globalStr;
+ }
+
+ public String[] getLines() {
+ return globalLines;
+ }
+
+}