You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/20 05:09:43 UTC
[3/5] nifi git commit: NIFI-3273: Added nifi-toolkit-flowfile-repo
that contains a simple Java class that is capable of recovering a FlowFile
Repository manually in the case of an operating system crash that results in
trailing 0's being dumped into the
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
new file mode 100644
index 0000000..afa19ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ResourceClaimFieldMap implements Record {
+ private final ResourceClaim resourceClaim;
+ private final RecordSchema schema;
+
+ public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) {
+ this.resourceClaim = resourceClaim;
+ this.schema = schema;
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case ContentClaimSchema.CLAIM_CONTAINER:
+ return resourceClaim.getContainer();
+ case ContentClaimSchema.CLAIM_SECTION:
+ return resourceClaim.getSection();
+ case ContentClaimSchema.CLAIM_IDENTIFIER:
+ return resourceClaim.getId();
+ case ContentClaimSchema.LOSS_TOLERANT:
+ return resourceClaim.isLossTolerant();
+ }
+
+ return null;
+ }
+
+ public static ResourceClaim getResourceClaim(final Record record, final ResourceClaimManager claimManager) {
+ final String container = (String) record.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+ final String section = (String) record.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+ final String identifier = (String) record.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+ final Boolean lossTolerant = (Boolean) record.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+ return claimManager.newResourceClaim(container, section, identifier, lossTolerant, false);
+ }
+
+ @Override
+ public int hashCode() {
+ return 41 + 91 * resourceClaim.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != ResourceClaimFieldMap.class) {
+ return false;
+ }
+
+ final ResourceClaimFieldMap other = (ResourceClaimFieldMap) obj;
+ return resourceClaim.equals(other.resourceClaim);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
new file mode 100644
index 0000000..59b0e7b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SchemaRepositoryRecordSerdeTest {
+ public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier";
+ private StandardResourceClaimManager resourceClaimManager;
+ private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde;
+ private Map<String, FlowFileQueue> queueMap;
+ private FlowFileQueue flowFileQueue;
+ private ByteArrayOutputStream byteArrayOutputStream;
+ private DataOutputStream dataOutputStream;
+
+ @Before
+ public void setup() {
+ resourceClaimManager = new StandardResourceClaimManager();
+ schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager);
+ queueMap = new HashMap<>();
+ schemaRepositoryRecordSerde.setQueueMap(queueMap);
+ flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER);
+ byteArrayOutputStream = new ByteArrayOutputStream();
+ dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ }
+
+ @After
+ public void teardown() {
+ resourceClaimManager.purge();
+ }
+
+ @Test
+ public void testV1CreateCantHandleLongAttributeName() throws IOException {
+ RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put(stringBuilder.toString(), "testValue");
+ schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
+ RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV1CreateCantHandleLongAttributeValue() throws IOException {
+ RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put("testName", stringBuilder.toString());
+ schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
+ RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV2CreateCanHandleLongAttributeName() throws IOException {
+ schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put(stringBuilder.toString(), "testValue");
+ schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV2CreateCanHandleLongAttributeValue() throws IOException {
+ schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put("testName", stringBuilder.toString());
+ schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testRoundTripCreateV1ToV2() throws IOException {
+ RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("testName", "testValue");
+ schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
+ RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV1SwapInCantHandleLongAttributeName() throws IOException {
+ RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put(stringBuilder.toString(), "testValue");
+ StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+ record.setSwapLocation("fake");
+ assertEquals(SWAP_IN, record.getType());
+ schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV1SwapInCantHandleLongAttributeValue() throws IOException {
+ RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put("testName", stringBuilder.toString());
+ StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+ record.setSwapLocation("fake");
+ assertEquals(SWAP_IN, record.getType());
+ schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV2SwapInCanHandleLongAttributeName() throws IOException {
+ schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put(stringBuilder.toString(), "testValue");
+ StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+ record.setSwapLocation("fake");
+ assertEquals(SWAP_IN, record.getType());
+ schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testV2SwapInCanHandleLongAttributeValue() throws IOException {
+ schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < 65536; i++) {
+ stringBuilder.append('a');
+ }
+ attributes.put("testName", stringBuilder.toString());
+ StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+ record.setSwapLocation("fake");
+ assertEquals(SWAP_IN, record.getType());
+ schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ }
+
+ @Test
+ public void testRoundTripSwapInV1ToV2() throws IOException {
+ RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("testName", "testValue");
+ StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+ record.setSwapLocation("fake");
+ assertEquals(SWAP_IN, record.getType());
+ schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ DataInputStream dataInputStream = createDataInputStream();
+ schemaRepositoryRecordSerde.readHeader(dataInputStream);
+ RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+ assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+ assertEquals(SWAP_IN, repositoryRecord.getType());
+ }
+
+ private DataInputStream createDataInputStream() throws IOException {
+ dataOutputStream.flush();
+ return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+ }
+
+ private StandardRepositoryRecord createCreateFlowFileRecord(Map<String, String> attributes) {
+ StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue);
+ StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder();
+ flowFileRecordBuilder.addAttributes(attributes);
+ standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
+ return standardRepositoryRecord;
+ }
+
+ private FlowFileQueue createMockQueue(String identifier) {
+ FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+ when(flowFileQueue.getIdentifier()).thenReturn(identifier);
+ queueMap.put(identifier, flowFileQueue);
+ return flowFileQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 54d777f..6395e6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -57,6 +57,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-repository-models</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
@@ -136,6 +140,10 @@
<artifactId>nifi-write-ahead-log</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-flowfile-repo-serialization</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
deleted file mode 100644
index 44ed62d..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> {
- private Map<String, FlowFileQueue> flowFileQueueMap = null;
-
- protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
- this.flowFileQueueMap = queueMap;
- }
-
- protected Map<String, FlowFileQueue> getQueueMap() {
- return flowFileQueueMap;
- }
-
- protected FlowFileQueue getFlowFileQueue(final String queueId) {
- return flowFileQueueMap.get(queueId);
- }
-
- @Override
- public Long getRecordIdentifier(final RepositoryRecord record) {
- return record.getCurrent().getId();
- }
-
- @Override
- public UpdateType getUpdateType(final RepositoryRecord record) {
- switch (record.getType()) {
- case CONTENTMISSING:
- case DELETE:
- return UpdateType.DELETE;
- case CREATE:
- return UpdateType.CREATE;
- case UPDATE:
- return UpdateType.UPDATE;
- case SWAP_OUT:
- return UpdateType.SWAP_OUT;
- case SWAP_IN:
- return UpdateType.SWAP_IN;
- }
- return null;
- }
-
- @Override
- public String getLocation(final RepositoryRecord record) {
- return record.getSwapLocation();
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
deleted file mode 100644
index c19fa94..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-
-public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> {
- private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
- private final ResourceClaimManager resourceClaimManager;
- private Map<String, FlowFileQueue> flowFileQueueMap = null;
-
- public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
- this.resourceClaimManager = claimManager;
- }
-
- protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
- this.flowFileQueueMap = queueMap;
- }
-
- protected Map<String, FlowFileQueue> getQueueMap() {
- return flowFileQueueMap;
- }
-
- @Override
- public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
- if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
- final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
- serde.setQueueMap(flowFileQueueMap);
- return serde;
- }
-
- if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
- || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
- final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager);
- serde.setQueueMap(flowFileQueueMap);
- return serde;
- }
-
- throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known");
- }
-
- protected FlowFileQueue getFlowFileQueue(final String queueId) {
- return flowFileQueueMap.get(queueId);
- }
-
- @Override
- public Long getRecordIdentifier(final RepositoryRecord record) {
- return record.getCurrent().getId();
- }
-
- @Override
- public UpdateType getUpdateType(final RepositoryRecord record) {
- switch (record.getType()) {
- case CONTENTMISSING:
- case DELETE:
- return UpdateType.DELETE;
- case CREATE:
- return UpdateType.CREATE;
- case UPDATE:
- return UpdateType.UPDATE;
- case SWAP_OUT:
- return UpdateType.SWAP_OUT;
- case SWAP_IN:
- return UpdateType.SWAP_IN;
- }
- return null;
- }
-
- @Override
- public String getLocation(final RepositoryRecord record) {
- return record.getSwapLocation();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
deleted file mode 100644
index 221f8ce..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap;
-import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
-import org.apache.nifi.controller.repository.schema.FlowFileSchema;
-import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
-import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
-import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
-import org.apache.nifi.repository.schema.FieldType;
-import org.apache.nifi.repository.schema.Record;
-import org.apache.nifi.repository.schema.RecordSchema;
-import org.apache.nifi.repository.schema.Repetition;
-import org.apache.nifi.repository.schema.SchemaRecordReader;
-import org.apache.nifi.repository.schema.SchemaRecordWriter;
-import org.apache.nifi.repository.schema.SimpleRecordField;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
- private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
- private static final int MAX_ENCODING_VERSION = 2;
-
- private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2;
- private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
-
- private final ResourceClaimManager resourceClaimManager;
- private volatile RecordSchema recoverySchema;
-
- public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
- this.resourceClaimManager = resourceClaimManager;
- }
-
- @Override
- public void writeHeader(final DataOutputStream out) throws IOException {
- writeSchema.writeTo(out);
- }
-
- @Override
- public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException {
- serializeRecord(newRecordState, out);
- }
-
- @Override
- public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
- final RecordSchema schema;
- switch (record.getType()) {
- case CREATE:
- case UPDATE:
- schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2;
- break;
- case CONTENTMISSING:
- case DELETE:
- schema = RepositoryRecordSchema.DELETE_SCHEMA_V2;
- break;
- case SWAP_IN:
- schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2;
- break;
- case SWAP_OUT:
- schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2;
- break;
- default:
- throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen.
- }
-
- serializeRecord(record, out, schema, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2);
- }
-
-
- protected void serializeRecord(final RepositoryRecord record, final DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) throws IOException {
- final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
- final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, repositoryRecordSchema);
- new SchemaRecordWriter().writeRecord(update, out);
- }
-
- @Override
- public void readHeader(final DataInputStream in) throws IOException {
- recoverySchema = RecordSchema.readFrom(in);
- }
-
- @Override
- public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
- return deserializeRecord(in, version);
- }
-
- @Override
- public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
- final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
- final Record updateRecord = reader.readRecord(in);
- if (updateRecord == null) {
- // null may be returned by reader.readRecord() if it encounters end-of-stream
- return null;
- }
-
- // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
- // top level that indicates which type of record we have.
- final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2);
-
- final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
- final UpdateType updateType = UpdateType.valueOf(actionType);
- switch (updateType) {
- case CREATE:
- return createRecord(record);
- case DELETE:
- return deleteRecord(record);
- case SWAP_IN:
- return swapInRecord(record);
- case SWAP_OUT:
- return swapOutRecord(record);
- case UPDATE:
- return updateRecord(record);
- default:
- throw new IOException("Found unrecognized Update Type '" + actionType + "'");
- }
- }
-
-
- @SuppressWarnings("unchecked")
- private StandardRepositoryRecord createRecord(final Record record) {
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID));
- ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE));
-
- final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE);
- final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX);
- ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-
- final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE);
- final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX);
- ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
- populateContentClaim(ffBuilder, record);
- ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
-
- ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES));
-
- final FlowFileRecord flowFileRecord = ffBuilder.build();
-
- final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
- final FlowFileQueue queue = getFlowFileQueue(queueId);
-
- final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, flowFileRecord);
- requireFlowFileQueue(repoRecord, queueId);
- return repoRecord;
- }
-
- private void requireFlowFileQueue(final StandardRepositoryRecord repoRecord, final String queueId) {
- if (queueId == null || queueId.trim().isEmpty()) {
- logger.warn("{} does not have a Queue associated with it; this record will be discarded", repoRecord.getCurrent());
- repoRecord.markForAbort();
- } else if (repoRecord.getOriginalQueue() == null) {
- logger.warn("{} maps to unknown Queue {}; this record will be discarded", repoRecord.getCurrent(), queueId);
- repoRecord.markForAbort();
- }
- }
-
- private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) {
- final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
- if (claimMap == null) {
- return;
- }
-
- final Record claimRecord = (Record) claimMap;
- final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager);
- final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord);
-
- ffBuilder.contentClaim(contentClaim);
- ffBuilder.contentClaimOffset(offset);
- }
-
- private RepositoryRecord updateRecord(final Record record) {
- return createRecord(record);
- }
-
- private RepositoryRecord deleteRecord(final Record record) {
- final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
- final FlowFileRecord flowFileRecord = ffBuilder.build();
-
- final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
- repoRecord.markForDelete();
- return repoRecord;
- }
-
- private RepositoryRecord swapInRecord(final Record record) {
- final StandardRepositoryRecord repoRecord = createRecord(record);
- final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
- repoRecord.setSwapLocation(swapLocation);
-
- final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
- requireFlowFileQueue(repoRecord, queueId);
- return repoRecord;
- }
-
- private RepositoryRecord swapOutRecord(final Record record) {
- final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
- final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
- final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
- final FlowFileQueue queue = getFlowFileQueue(queueId);
-
- final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .id(recordId)
- .build();
-
- return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation);
- }
-
- @Override
- public int getVersion() {
- return MAX_ENCODING_VERSION;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
deleted file mode 100644
index a1d5173..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.builder.CompareToBuilder;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-/**
- * <p>
- * A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.
- * </p>
- *
- * <b>Immutable - Thread Safe</b>
- *
- */
-public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
-
- private final long id;
- private final long entryDate;
- private final long lineageStartDate;
- private final long lineageStartIndex;
- private final long size;
- private final long penaltyExpirationMs;
- private final Map<String, String> attributes;
- private final ContentClaim claim;
- private final long claimOffset;
- private final long lastQueueDate;
- private final long queueDateIndex;
-
- private StandardFlowFileRecord(final Builder builder) {
- this.id = builder.bId;
- this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes;
- this.entryDate = builder.bEntryDate;
- this.lineageStartDate = builder.bLineageStartDate;
- this.lineageStartIndex = builder.bLineageStartIndex;
- this.penaltyExpirationMs = builder.bPenaltyExpirationMs;
- this.size = builder.bSize;
- this.claim = builder.bClaim;
- this.claimOffset = builder.bClaimOffset;
- this.lastQueueDate = builder.bLastQueueDate;
- this.queueDateIndex = builder.bQueueDateIndex;
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- @Override
- public long getEntryDate() {
- return entryDate;
- }
-
- @Override
- public long getLineageStartDate() {
- return lineageStartDate;
- }
-
- @Override
- public Long getLastQueueDate() {
- return lastQueueDate;
- }
-
- @Override
- public boolean isPenalized() {
- return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false;
- }
-
- @Override
- public String getAttribute(final String key) {
- return attributes.get(key);
- }
-
- @Override
- public long getSize() {
- return size;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return Collections.unmodifiableMap(this.attributes);
- }
-
- @Override
- public ContentClaim getContentClaim() {
- return this.claim;
- }
-
- @Override
- public long getContentClaimOffset() {
- return this.claimOffset;
- }
-
- @Override
- public long getLineageStartIndex() {
- return lineageStartIndex;
- }
-
- @Override
- public long getQueueDateIndex() {
- return queueDateIndex;
- }
-
- /**
- * Provides the natural ordering for FlowFile objects which is based on their identifier.
- *
- * @param other other
- * @return standard compare contract
- */
- @Override
- public int compareTo(final FlowFile other) {
- return new CompareToBuilder().append(id, other.getId()).toComparison();
- }
-
- @Override
- public boolean equals(final Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof FlowFile)) {
- return false;
- }
- final FlowFile otherRecord = (FlowFile) other;
- return new EqualsBuilder().append(id, otherRecord.getId()).isEquals();
- }
-
- @Override
- public String toString() {
- final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
- builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
- builder.append("claim", claim == null ? "" : claim.toString());
- builder.append("offset", claimOffset);
- builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
- return builder.toString();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(7, 13).append(id).toHashCode();
- }
-
- public static final class Builder {
-
- private long bId;
- private long bEntryDate = System.currentTimeMillis();
- private long bLineageStartDate = bEntryDate;
- private long bLineageStartIndex = 0L;
- private final Set<String> bLineageIdentifiers = new HashSet<>();
- private long bPenaltyExpirationMs = -1L;
- private long bSize = 0L;
- private ContentClaim bClaim = null;
- private long bClaimOffset = 0L;
- private long bLastQueueDate = System.currentTimeMillis();
- private long bQueueDateIndex = 0L;
- private Map<String, String> bAttributes;
- private boolean bAttributesCopied = false;
-
- public Builder id(final long id) {
- bId = id;
- return this;
- }
-
- public Builder entryDate(final long epochMs) {
- bEntryDate = epochMs;
- return this;
- }
-
- public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) {
- bLineageStartDate = lineageStartDate;
- bLineageStartIndex = lineageStartIndex;
- return this;
- }
-
- public Builder penaltyExpirationTime(final long epochMilliseconds) {
- bPenaltyExpirationMs = epochMilliseconds;
- return this;
- }
-
- public Builder size(final long bytes) {
- if (bytes >= 0) {
- bSize = bytes;
- }
- return this;
- }
-
- private Map<String, String> initializeAttributes() {
- if (bAttributes == null) {
- bAttributes = new HashMap<>();
- bAttributesCopied = true;
- } else if (!bAttributesCopied) {
- bAttributes = new HashMap<>(bAttributes);
- bAttributesCopied = true;
- }
-
- return bAttributes;
- }
-
- public Builder addAttribute(final String key, final String value) {
- if (key != null && value != null) {
- initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value);
- }
- return this;
- }
-
- public Builder addAttributes(final Map<String, String> attributes) {
- final Map<String, String> initializedAttributes = initializeAttributes();
-
- if (null != attributes) {
- for (final String key : attributes.keySet()) {
- FlowFile.KeyValidator.validateKey(key);
- }
- for (final Map.Entry<String, String> entry : attributes.entrySet()) {
- final String key = entry.getKey();
- final String value = entry.getValue();
- if (key != null && value != null) {
- initializedAttributes.put(key, value);
- }
- }
- }
- return this;
- }
-
- public Builder removeAttributes(final String... keys) {
- if (keys != null) {
- for (final String key : keys) {
- if (CoreAttributes.UUID.key().equals(key)) {
- continue;
- }
-
- initializeAttributes().remove(key);
- }
- }
- return this;
- }
-
- public Builder removeAttributes(final Set<String> keys) {
- if (keys != null) {
- for (final String key : keys) {
- if (CoreAttributes.UUID.key().equals(key)) {
- continue;
- }
-
- initializeAttributes().remove(key);
- }
- }
- return this;
- }
-
- public Builder removeAttributes(final Pattern keyPattern) {
- if (keyPattern != null) {
- final Iterator<String> iterator = initializeAttributes().keySet().iterator();
- while (iterator.hasNext()) {
- final String key = iterator.next();
-
- if (CoreAttributes.UUID.key().equals(key)) {
- continue;
- }
-
- if (keyPattern.matcher(key).matches()) {
- iterator.remove();
- }
- }
- }
- return this;
- }
-
- public Builder contentClaim(final ContentClaim claim) {
- this.bClaim = claim;
- return this;
- }
-
- public Builder contentClaimOffset(final long offset) {
- this.bClaimOffset = offset;
- return this;
- }
-
- public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) {
- this.bLastQueueDate = lastQueueDate;
- this.bQueueDateIndex = queueDateIndex;
- return this;
- }
-
- public Builder fromFlowFile(final FlowFileRecord specFlowFile) {
- if (specFlowFile == null) {
- return this;
- }
- bId = specFlowFile.getId();
- bEntryDate = specFlowFile.getEntryDate();
- bLineageStartDate = specFlowFile.getLineageStartDate();
- bLineageStartIndex = specFlowFile.getLineageStartIndex();
- bLineageIdentifiers.clear();
- bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
- bSize = specFlowFile.getSize();
- bAttributes = specFlowFile.getAttributes();
- bAttributesCopied = false;
- bClaim = specFlowFile.getContentClaim();
- bClaimOffset = specFlowFile.getContentClaimOffset();
- bLastQueueDate = specFlowFile.getLastQueueDate();
- bQueueDateIndex = specFlowFile.getQueueDateIndex();
-
- return this;
- }
-
- public FlowFileRecord build() {
- return new StandardFlowFileRecord(this);
- }
- }
-
- @Override
- public long getPenaltyExpirationMillis() {
- return penaltyExpirationMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
deleted file mode 100644
index 8aa1caf..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.processor.Relationship;
-
-public class StandardRepositoryRecord implements RepositoryRecord {
-
- private RepositoryRecordType type = null;
- private FlowFileRecord workingFlowFileRecord = null;
- private Relationship transferRelationship = null;
- private FlowFileQueue destination = null;
- private final FlowFileRecord originalFlowFileRecord;
- private final FlowFileQueue originalQueue;
- private String swapLocation;
- private final Map<String, String> updatedAttributes = new HashMap<>();
- private final Map<String, String> originalAttributes;
- private List<ContentClaim> transientClaims;
-
- /**
- * Creates a new record which has no original claim or flow file - it is entirely new
- *
- * @param originalQueue queue
- */
- public StandardRepositoryRecord(final FlowFileQueue originalQueue) {
- this(originalQueue, null);
- this.type = RepositoryRecordType.CREATE;
- }
-
- /**
- * Creates a record based on given original items
- *
- * @param originalQueue queue
- * @param originalFlowFileRecord record
- */
- public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) {
- this(originalQueue, originalFlowFileRecord, null);
- this.type = RepositoryRecordType.UPDATE;
- }
-
- public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord, final String swapLocation) {
- this.originalQueue = originalQueue;
- this.originalFlowFileRecord = originalFlowFileRecord;
- this.type = RepositoryRecordType.SWAP_OUT;
- this.swapLocation = swapLocation;
- this.originalAttributes = originalFlowFileRecord == null ? Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes();
- }
-
- @Override
- public FlowFileQueue getDestination() {
- return destination;
- }
-
- public void setDestination(final FlowFileQueue destination) {
- this.destination = destination;
- }
-
- @Override
- public RepositoryRecordType getType() {
- return type;
- }
-
- FlowFileRecord getOriginal() {
- return originalFlowFileRecord;
- }
-
- @Override
- public String getSwapLocation() {
- return swapLocation;
- }
-
- public void setSwapLocation(final String swapLocation) {
- this.swapLocation = swapLocation;
- if (type != RepositoryRecordType.SWAP_OUT) {
- type = RepositoryRecordType.SWAP_IN; // we are swapping in a new record
- }
- }
-
- @Override
- public ContentClaim getOriginalClaim() {
- return (originalFlowFileRecord == null) ? null : originalFlowFileRecord.getContentClaim();
- }
-
- @Override
- public FlowFileQueue getOriginalQueue() {
- return originalQueue;
- }
-
- public void setWorking(final FlowFileRecord flowFile) {
- workingFlowFileRecord = flowFile;
- }
-
- public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) {
- workingFlowFileRecord = flowFile;
-
- // If setting attribute to same value as original, don't add to updated attributes
- final String currentValue = originalAttributes.get(attributeKey);
- if (currentValue == null || !currentValue.equals(attributeValue)) {
- updatedAttributes.put(attributeKey, attributeValue);
- }
- }
-
- public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) {
- workingFlowFileRecord = flowFile;
-
- for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) {
- final String currentValue = originalAttributes.get(entry.getKey());
- if (currentValue == null || !currentValue.equals(entry.getValue())) {
- updatedAttributes.put(entry.getKey(), entry.getValue());
- }
- }
- }
-
- @Override
- public boolean isAttributesChanged() {
- return !updatedAttributes.isEmpty();
- }
-
- public void markForAbort() {
- type = RepositoryRecordType.CONTENTMISSING;
- }
-
- @Override
- public boolean isMarkedForAbort() {
- return RepositoryRecordType.CONTENTMISSING.equals(type);
- }
-
- public void markForDelete() {
- type = RepositoryRecordType.DELETE;
- }
-
- public boolean isMarkedForDelete() {
- return RepositoryRecordType.DELETE.equals(type);
- }
-
- public void setTransferRelationship(final Relationship relationship) {
- transferRelationship = relationship;
- }
-
- public Relationship getTransferRelationship() {
- return transferRelationship;
- }
-
- FlowFileRecord getWorking() {
- return workingFlowFileRecord;
- }
-
- ContentClaim getWorkingClaim() {
- return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim();
- }
-
- @Override
- public FlowFileRecord getCurrent() {
- return (workingFlowFileRecord == null) ? originalFlowFileRecord : workingFlowFileRecord;
- }
-
- @Override
- public ContentClaim getCurrentClaim() {
- return (getCurrent() == null) ? null : getCurrent().getContentClaim();
- }
-
- @Override
- public long getCurrentClaimOffset() {
- return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset();
- }
-
- boolean isWorking() {
- return (workingFlowFileRecord != null);
- }
-
- Map<String, String> getOriginalAttributes() {
- return originalAttributes;
- }
-
- Map<String, String> getUpdatedAttributes() {
- return updatedAttributes;
- }
-
- @Override
- public String toString() {
- return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]";
- }
-
- @Override
- public List<ContentClaim> getTransientClaims() {
- return transientClaims == null ? Collections.<ContentClaim> emptyList() : Collections.unmodifiableList(transientClaims);
- }
-
- void addTransientClaim(final ContentClaim claim) {
- if (claim == null) {
- return;
- }
-
- if (transientClaims == null) {
- transientClaims = new ArrayList<>();
- }
- transientClaims.add(claim);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
deleted file mode 100644
index e8ce44e..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
- private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
-
- private static final int CURRENT_ENCODING_VERSION = 9;
-
- public static final byte ACTION_CREATE = 0;
- public static final byte ACTION_UPDATE = 1;
- public static final byte ACTION_DELETE = 2;
- public static final byte ACTION_SWAPPED_OUT = 3;
- public static final byte ACTION_SWAPPED_IN = 4;
-
- private long recordsRestored = 0L;
- private final ResourceClaimManager claimManager;
-
- public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) {
- this.claimManager = claimManager;
- }
-
- @Override
- public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException {
- serializeEdit(previousRecordState, record, out, false);
- }
-
- public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException {
- if (record.isMarkedForAbort()) {
- logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record);
- out.write(ACTION_DELETE);
- out.writeLong(getRecordIdentifier(record));
- serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
- return;
- }
-
- final UpdateType updateType = getUpdateType(record);
-
- if (updateType.equals(UpdateType.DELETE)) {
- out.write(ACTION_DELETE);
- out.writeLong(getRecordIdentifier(record));
- serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
- return;
- }
-
- // If there's a Destination Connection, that's the one that we want to associated with this record.
- // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection".
- // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen,
- // so we use the originalConnection instead
- FlowFileQueue associatedQueue = record.getDestination();
- if (associatedQueue == null) {
- associatedQueue = record.getOriginalQueue();
- }
-
- if (updateType.equals(UpdateType.SWAP_OUT)) {
- out.write(ACTION_SWAPPED_OUT);
- out.writeLong(getRecordIdentifier(record));
- out.writeUTF(associatedQueue.getIdentifier());
- out.writeUTF(getLocation(record));
- return;
- }
-
- final FlowFile flowFile = record.getCurrent();
- final ContentClaim claim = record.getCurrentClaim();
-
- switch (updateType) {
- case UPDATE:
- out.write(ACTION_UPDATE);
- break;
- case CREATE:
- out.write(ACTION_CREATE);
- break;
- case SWAP_IN:
- out.write(ACTION_SWAPPED_IN);
- break;
- default:
- throw new AssertionError();
- }
-
- out.writeLong(getRecordIdentifier(record));
- out.writeLong(flowFile.getEntryDate());
- out.writeLong(flowFile.getLineageStartDate());
- out.writeLong(flowFile.getLineageStartIndex());
-
- final Long queueDate = flowFile.getLastQueueDate();
- out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
- out.writeLong(flowFile.getQueueDateIndex());
- out.writeLong(flowFile.getSize());
-
- if (associatedQueue == null) {
- logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart",
- new Object[] {this, record});
- writeString("", out);
- } else {
- writeString(associatedQueue.getIdentifier(), out);
- }
-
- serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
-
- if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
- out.write(1); // indicate attributes changed
- final Map<String, String> attributes = flowFile.getAttributes();
- out.writeInt(attributes.size());
- for (final Map.Entry<String, String> entry : attributes.entrySet()) {
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- } else {
- out.write(0); // indicate attributes did not change
- }
-
- if (updateType == UpdateType.SWAP_IN) {
- out.writeUTF(record.getSwapLocation());
- }
- }
-
- @Override
- public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
- final int action = in.read();
- final long recordId = in.readLong();
- if (action == ACTION_DELETE) {
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
- if (version > 4) {
- deserializeClaim(in, version, ffBuilder);
- }
-
- final FlowFileRecord flowFileRecord = ffBuilder.build();
- final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
- record.markForDelete();
-
- return record;
- }
-
- if (action == ACTION_SWAPPED_OUT) {
- final String queueId = in.readUTF();
- final String location = in.readUTF();
- final FlowFileQueue queue = getFlowFileQueue(queueId);
-
- final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .id(recordId)
- .build();
-
- return new StandardRepositoryRecord(queue, flowFileRecord, location);
- }
-
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- final RepositoryRecord record = currentRecordStates.get(recordId);
- ffBuilder.id(recordId);
- if (record != null) {
- ffBuilder.fromFlowFile(record.getCurrent());
- }
- ffBuilder.entryDate(in.readLong());
-
- if (version > 1) {
- // read the lineage identifiers and lineage start date, which were added in version 2.
- if (version < 9) {
- final int numLineageIds = in.readInt();
- for (int i = 0; i < numLineageIds; i++) {
- in.readUTF(); //skip identifiers
- }
- }
- final long lineageStartDate = in.readLong();
- final long lineageStartIndex;
- if (version > 7) {
- lineageStartIndex = in.readLong();
- } else {
- lineageStartIndex = 0L;
- }
- ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
- if (version > 5) {
- final long lastQueueDate = in.readLong();
- final long queueDateIndex;
- if (version > 7) {
- queueDateIndex = in.readLong();
- } else {
- queueDateIndex = 0L;
- }
-
- ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
- }
- }
-
- ffBuilder.size(in.readLong());
- final String connectionId = readString(in);
-
- logger.debug("{} -> {}", new Object[] {recordId, connectionId});
-
- deserializeClaim(in, version, ffBuilder);
-
- // recover new attributes, if they changed
- final int attributesChanged = in.read();
- if (attributesChanged == -1) {
- throw new EOFException();
- } else if (attributesChanged == 1) {
- final int numAttributes = in.readInt();
- final Map<String, String> attributes = new HashMap<>();
- for (int i = 0; i < numAttributes; i++) {
- final String key = readString(in);
- final String value = readString(in);
- attributes.put(key, value);
- }
-
- ffBuilder.addAttributes(attributes);
- } else if (attributesChanged != 0) {
- throw new IOException("Attribute Change Qualifier not found in stream; found value: "
- + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
- }
-
- final FlowFileRecord flowFile = ffBuilder.build();
- String swapLocation = null;
- if (action == ACTION_SWAPPED_IN) {
- swapLocation = in.readUTF();
- }
-
- final FlowFileQueue queue = getFlowFileQueue(connectionId);
- final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile);
- if (swapLocation != null) {
- standardRepoRecord.setSwapLocation(swapLocation);
- }
-
- if (connectionId.isEmpty()) {
- logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile);
- standardRepoRecord.markForAbort();
- } else if (queue == null) {
- logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId);
- standardRepoRecord.markForAbort();
- }
-
- recordsRestored++;
- return standardRepoRecord;
- }
-
- @Override
- public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
- final int action = in.read();
- if (action == -1) {
- return null;
- }
-
- final long recordId = in.readLong();
- if (action == ACTION_DELETE) {
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
- if (version > 4) {
- deserializeClaim(in, version, ffBuilder);
- }
-
- final FlowFileRecord flowFileRecord = ffBuilder.build();
- final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
- record.markForDelete();
- return record;
- }
-
- // if action was not delete, it must be create/swap in
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- final long entryDate = in.readLong();
-
- if (version > 1) {
- // read the lineage identifiers and lineage start date, which were added in version 2.
- if (version < 9) {
- final int numLineageIds = in.readInt();
- for (int i = 0; i < numLineageIds; i++) {
- in.readUTF(); //skip identifiers
- }
- }
-
- final long lineageStartDate = in.readLong();
- final long lineageStartIndex;
- if (version > 7) {
- lineageStartIndex = in.readLong();
- } else {
- lineageStartIndex = 0L;
- }
- ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
- if (version > 5) {
- final long lastQueueDate = in.readLong();
- final long queueDateIndex;
- if (version > 7) {
- queueDateIndex = in.readLong();
- } else {
- queueDateIndex = 0L;
- }
-
- ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
- }
- }
-
- final long size = in.readLong();
- final String connectionId = readString(in);
-
- logger.debug("{} -> {}", new Object[] {recordId, connectionId});
-
- ffBuilder.id(recordId);
- ffBuilder.entryDate(entryDate);
- ffBuilder.size(size);
-
- deserializeClaim(in, version, ffBuilder);
-
- final int attributesChanged = in.read();
- if (attributesChanged == 1) {
- final int numAttributes = in.readInt();
- final Map<String, String> attributes = new HashMap<>();
- for (int i = 0; i < numAttributes; i++) {
- final String key = readString(in);
- final String value = readString(in);
- attributes.put(key, value);
- }
-
- ffBuilder.addAttributes(attributes);
- } else if (attributesChanged == -1) {
- throw new EOFException();
- } else if (attributesChanged != 0) {
- throw new IOException("Attribute Change Qualifier not found in stream; found value: "
- + attributesChanged + " after successfully restoring " + recordsRestored + " records");
- }
-
- final FlowFileRecord flowFile = ffBuilder.build();
- String swapLocation = null;
- if (action == ACTION_SWAPPED_IN) {
- swapLocation = in.readUTF();
- }
-
- final StandardRepositoryRecord record;
- final FlowFileQueue queue = getFlowFileQueue(connectionId);
- record = new StandardRepositoryRecord(queue, flowFile);
- if (swapLocation != null) {
- record.setSwapLocation(swapLocation);
- }
-
- if (connectionId.isEmpty()) {
- logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile);
- record.markForAbort();
- } else if (queue == null) {
- logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId);
- record.markForAbort();
- }
-
- recordsRestored++;
- return record;
- }
-
- @Override
- public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
- serializeEdit(null, record, out, true);
- }
-
- private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException {
- if (claim == null) {
- out.write(0);
- } else {
- out.write(1);
-
- final ResourceClaim resourceClaim = claim.getResourceClaim();
- writeString(resourceClaim.getId(), out);
- writeString(resourceClaim.getContainer(), out);
- writeString(resourceClaim.getSection(), out);
- out.writeLong(claim.getOffset());
- out.writeLong(claim.getLength());
-
- out.writeLong(offset);
- out.writeBoolean(resourceClaim.isLossTolerant());
- }
- }
-
- private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException {
- // determine current Content Claim.
- final int claimExists = in.read();
- if (claimExists == 1) {
- final String claimId;
- if (serializationVersion < 4) {
- claimId = String.valueOf(in.readLong());
- } else {
- claimId = readString(in);
- }
-
- final String container = readString(in);
- final String section = readString(in);
-
- final long resourceOffset;
- final long resourceLength;
- if (serializationVersion < 7) {
- resourceOffset = 0L;
- resourceLength = -1L;
- } else {
- resourceOffset = in.readLong();
- resourceLength = in.readLong();
- }
-
- final long claimOffset = in.readLong();
-
- final boolean lossTolerant;
- if (serializationVersion >= 3) {
- lossTolerant = in.readBoolean();
- } else {
- lossTolerant = false;
- }
-
- final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
- final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
- contentClaim.setLength(resourceLength);
-
- ffBuilder.contentClaim(contentClaim);
- ffBuilder.contentClaimOffset(claimOffset);
- } else if (claimExists == -1) {
- throw new EOFException();
- } else if (claimExists != 0) {
- throw new IOException("Claim Existence Qualifier not found in stream; found value: "
- + claimExists + " after successfully restoring " + recordsRestored + " records");
- }
- }
-
- private void writeString(final String toWrite, final OutputStream out) throws IOException {
- final byte[] bytes = toWrite.getBytes("UTF-8");
- final int utflen = bytes.length;
-
- if (utflen < 65535) {
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- } else {
- out.write(255);
- out.write(255);
- out.write(utflen >>> 24);
- out.write(utflen >>> 16);
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- }
- }
-
- private String readString(final InputStream in) throws IOException {
- final Integer numBytes = readFieldLength(in);
- if (numBytes == null) {
- throw new EOFException();
- }
- final byte[] bytes = new byte[numBytes];
- fillBuffer(in, bytes, numBytes);
- return new String(bytes, "UTF-8");
- }
-
- private Integer readFieldLength(final InputStream in) throws IOException {
- final int firstValue = in.read();
- final int secondValue = in.read();
- if (firstValue < 0) {
- return null;
- }
- if (secondValue < 0) {
- throw new EOFException();
- }
- if (firstValue == 0xff && secondValue == 0xff) {
- final int ch1 = in.read();
- final int ch2 = in.read();
- final int ch3 = in.read();
- final int ch4 = in.read();
- if ((ch1 | ch2 | ch3 | ch4) < 0) {
- throw new EOFException();
- }
- return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
- } else {
- return (firstValue << 8) + secondValue;
- }
- }
-
- private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
- int bytesRead;
- int totalBytesRead = 0;
- while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
- totalBytesRead += bytesRead;
- }
- if (totalBytesRead != length) {
- throw new EOFException();
- }
- }
-
- @Override
- public int getVersion() {
- return CURRENT_ENCODING_VERSION;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
deleted file mode 100644
index 39a2591..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-
-/**
- * <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow files may reference the same content by both having the same content claim.</p>
- *
- * <p>
- * Must be thread safe</p>
- *
- */
-public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
-
- private final ResourceClaim resourceClaim;
- private final long offset;
- private volatile long length;
-
- public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
- this.resourceClaim = resourceClaim;
- this.offset = offset;
- this.length = -1L;
- }
-
- public void setLength(final long length) {
- this.length = length;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result;
- result = prime * result + (int) (offset ^ offset >>> 32);
- result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- if (!(obj instanceof ContentClaim)) {
- return false;
- }
-
- final ContentClaim other = (ContentClaim) obj;
- if (offset != other.getOffset()) {
- return false;
- }
-
- return resourceClaim.equals(other.getResourceClaim());
- }
-
- @Override
- public int compareTo(final ContentClaim o) {
- final int resourceComp = resourceClaim.compareTo(o.getResourceClaim());
- if (resourceComp != 0) {
- return resourceComp;
- }
-
- return Long.compare(offset, o.getOffset());
- }
-
- @Override
- public ResourceClaim getResourceClaim() {
- return resourceClaim;
- }
-
- @Override
- public long getOffset() {
- return offset;
- }
-
- @Override
- public long getLength() {
- return length;
- }
-
- @Override
- public String toString() {
- return "StandardContentClaim [resourceClaim=" + resourceClaim + ", offset=" + offset + ", length=" + length + "]";
- }
-}