You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/27 18:04:48 UTC

[03/10] incubator-nifi git commit: NIFI-527: Merging develop

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
new file mode 100644
index 0000000..6f85b94
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestStandardRecordReaderWriter {
+    @BeforeClass
+    public static void setLogLevel() {
+    	System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+    }
+
+	private ProvenanceEventRecord createEvent() {
+		final Map<String, String> attributes = new HashMap<>();
+		attributes.put("filename", "1.txt");
+        attributes.put("uuid", UUID.randomUUID().toString());
+
+		final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+
+        return record;
+	}
+	
+	@Test
+	public void testSimpleWriteWithToc() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
+        
+        writer.writeHeader();
+        writer.writeRecord(createEvent(), 1L);
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	assertEquals(0, reader.getBlockIndex());
+        	reader.skipToBlock(0);
+        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+        	assertNotNull(recovered);
+        	
+        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	assertNull(reader.nextRecord());
+        }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+	}
+	
+	
+	@Test
+	public void testSingleRecordCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+        
+        writer.writeHeader();
+        writer.writeRecord(createEvent(), 1L);
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	assertEquals(0, reader.getBlockIndex());
+        	reader.skipToBlock(0);
+        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+        	assertNotNull(recovered);
+        	
+        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	assertNull(reader.nextRecord());
+        }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+	}
+	
+	
+	@Test
+	public void testMultipleRecordsSameBlockCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        // new record each 1 MB of uncompressed data
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
+        
+        writer.writeHeader();
+        for (int i=0; i < 10; i++) {
+        	writer.writeRecord(createEvent(), i);
+        }
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	for (int i=0; i < 10; i++) {
+	        	assertEquals(0, reader.getBlockIndex());
+	        	
+	        	// call skipToBlock half the time to ensure that we can; avoid calling it
+	        	// the other half of the time to ensure that it's okay.
+	        	if (i <= 5) {
+	        		reader.skipToBlock(0);
+	        	}
+	        	
+	        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+	        	assertNotNull(recovered);
+	        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	}
+        	
+        	assertNull(reader.nextRecord());
+        }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+	}
+	
+	
+	@Test
+	public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        // new block each 10 bytes
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+        
+        writer.writeHeader();
+        for (int i=0; i < 10; i++) {
+        	writer.writeRecord(createEvent(), i);
+        }
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	for (int i=0; i < 10; i++) {
+	        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+	        	System.out.println(recovered);
+	        	assertNotNull(recovered);
+	        	assertEquals((long) i, recovered.getEventId());
+	        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	}
+        	
+        	assertNull(reader.nextRecord());
+        }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
new file mode 100644
index 0000000..7459fe8
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class TestUtil {
+	public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+        final Map<String, String> attrCopy = new HashMap<>(attributes);
+
+        return new FlowFile() {
+            @Override
+            public long getId() {
+                return id;
+            }
+
+            @Override
+            public long getEntryDate() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public Set<String> getLineageIdentifiers() {
+                return new HashSet<String>();
+            }
+
+            @Override
+            public long getLineageStartDate() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public Long getLastQueueDate() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public boolean isPenalized() {
+                return false;
+            }
+
+            @Override
+            public String getAttribute(final String s) {
+                return attrCopy.get(s);
+            }
+
+            @Override
+            public long getSize() {
+                return fileSize;
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return attrCopy;
+            }
+
+            @Override
+            public int compareTo(final FlowFile o) {
+                return 0;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
new file mode 100644
index 0000000..30326e7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestStandardTocReader {
+
+    @Test
+    public void testDetectsCompression() throws IOException {
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write(0);
+            out.write(0);
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertFalse(reader.isCompressed());
+            }
+        } finally {
+            file.delete();
+        }
+        
+        
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write(0);
+            out.write(1);
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertTrue(reader.isCompressed());
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void testGetBlockIndex() throws IOException {
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        try (final OutputStream out = new FileOutputStream(file);
+             final DataOutputStream dos = new DataOutputStream(out)) {
+            out.write(0);
+            out.write(0);
+            
+            for (int i=0; i < 1024; i++) {
+                dos.writeLong(i * 1024L);
+            }
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertFalse(reader.isCompressed());
+                
+                for (int i=0; i < 1024; i++) {
+                    assertEquals(i * 1024, reader.getBlockOffset(i));
+                }
+            }
+        } finally {
+            file.delete();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
new file mode 100644
index 0000000..70f55a2
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Test;
+
+public class TestStandardTocWriter {
+    @Test
+    public void testOverwriteEmptyFile() throws IOException {
+        final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+        try {
+            assertTrue( tocFile.createNewFile() );
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+            }
+        } finally {
+            FileUtils.deleteFile(tocFile, false);
+        }
+    }
+    
+}