You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/27 18:04:48 UTC
[03/10] incubator-nifi git commit: NIFI-527: Merging develop
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
new file mode 100644
index 0000000..6f85b94
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestStandardRecordReaderWriter {
+ @BeforeClass
+ public static void setLogLevel() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ }
+
+ private ProvenanceEventRecord createEvent() {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", "1.txt");
+ attributes.put("uuid", UUID.randomUUID().toString());
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+ final ProvenanceEventRecord record = builder.build();
+
+ return record;
+ }
+
+ @Test
+ public void testSimpleWriteWithToc() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
+
+ writer.writeHeader();
+ writer.writeRecord(createEvent(), 1L);
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+
+
+ @Test
+ public void testSingleRecordCompressed() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+
+ writer.writeHeader();
+ writer.writeRecord(createEvent(), 1L);
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+
+
+ @Test
+ public void testMultipleRecordsSameBlockCompressed() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ // new record each 1 MB of uncompressed data
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
+
+ writer.writeHeader();
+ for (int i=0; i < 10; i++) {
+ writer.writeRecord(createEvent(), i);
+ }
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ assertEquals(0, reader.getBlockIndex());
+
+ // call skipToBlock half the time to ensure that we can; avoid calling it
+ // the other half of the time to ensure that it's okay.
+ if (i <= 5) {
+ reader.skipToBlock(0);
+ }
+
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+
+
+ @Test
+ public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ // new block each 10 bytes
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+
+ writer.writeHeader();
+ for (int i=0; i < 10; i++) {
+ writer.writeRecord(createEvent(), i);
+ }
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ System.out.println(recovered);
+ assertNotNull(recovered);
+ assertEquals((long) i, recovered.getEventId());
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
new file mode 100644
index 0000000..7459fe8
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class TestUtil {
+ public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+ final Map<String, String> attrCopy = new HashMap<>(attributes);
+
+ return new FlowFile() {
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public long getEntryDate() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public Set<String> getLineageIdentifiers() {
+ return new HashSet<String>();
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public Long getLastQueueDate() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+
+ @Override
+ public String getAttribute(final String s) {
+ return attrCopy.get(s);
+ }
+
+ @Override
+ public long getSize() {
+ return fileSize;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return attrCopy;
+ }
+
+ @Override
+ public int compareTo(final FlowFile o) {
+ return 0;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
new file mode 100644
index 0000000..30326e7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestStandardTocReader {
+
+ @Test
+ public void testDetectsCompression() throws IOException {
+ final File file = new File("target/" + UUID.randomUUID().toString());
+ try (final OutputStream out = new FileOutputStream(file)) {
+ out.write(0);
+ out.write(0);
+ }
+
+ try {
+ try(final StandardTocReader reader = new StandardTocReader(file)) {
+ assertFalse(reader.isCompressed());
+ }
+ } finally {
+ file.delete();
+ }
+
+
+ try (final OutputStream out = new FileOutputStream(file)) {
+ out.write(0);
+ out.write(1);
+ }
+
+ try {
+ try(final StandardTocReader reader = new StandardTocReader(file)) {
+ assertTrue(reader.isCompressed());
+ }
+ } finally {
+ file.delete();
+ }
+ }
+
+
+ @Test
+ public void testGetBlockIndex() throws IOException {
+ final File file = new File("target/" + UUID.randomUUID().toString());
+ try (final OutputStream out = new FileOutputStream(file);
+ final DataOutputStream dos = new DataOutputStream(out)) {
+ out.write(0);
+ out.write(0);
+
+ for (int i=0; i < 1024; i++) {
+ dos.writeLong(i * 1024L);
+ }
+ }
+
+ try {
+ try(final StandardTocReader reader = new StandardTocReader(file)) {
+ assertFalse(reader.isCompressed());
+
+ for (int i=0; i < 1024; i++) {
+ assertEquals(i * 1024, reader.getBlockOffset(i));
+ }
+ }
+ } finally {
+ file.delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
new file mode 100644
index 0000000..70f55a2
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Test;
+
+public class TestStandardTocWriter {
+ @Test
+ public void testOverwriteEmptyFile() throws IOException {
+ final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+ try {
+ assertTrue( tocFile.createNewFile() );
+
+ try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+ }
+ } finally {
+ FileUtils.deleteFile(tocFile, false);
+ }
+ }
+
+}