You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/16 21:33:36 UTC
nifi git commit: NIFI-4756: Updated PublishKafkaRecord processors to
include attributes generated from schema write strategy into the message
headers when appropriate
Repository: nifi
Updated Branches:
refs/heads/master 28e1bcc9d -> 7c1ce1722
NIFI-4756: Updated PublishKafkaRecord processors to include attributes generated from schema write strategy into the message headers when appropriate
This closes #2396.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7c1ce172
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7c1ce172
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7c1ce172
Branch: refs/heads/master
Commit: 7c1ce172232d5fd8ab2a1c1649a9dcbf1a9d08d7
Parents: 28e1bcc
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 10 09:04:52 2018 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 16 16:31:34 2018 -0500
----------------------------------------------------------------------
.../WriteAvroSchemaAttributeStrategy.java | 26 ++-
.../processors/kafka/pubsub/PublisherLease.java | 24 +-
.../pubsub/TestPublishKafkaRecord_0_11.java | 2 +-
.../kafka/pubsub/TestPublisherLease.java | 5 +-
.../nifi/processors/kafka/pubsub/TestUtils.java | 45 ----
.../kafka/pubsub/util/MockRecordWriter.java | 2 +-
.../processors/kafka/test/EmbeddedKafka.java | 226 -------------------
.../nifi/processors/kafka/pubsub/TestUtils.java | 45 ----
.../processors/kafka/pubsub/PublisherLease.java | 24 +-
.../pubsub/TestPublishKafkaRecord_1_0.java | 2 +-
.../kafka/pubsub/TestPublisherLease.java | 2 +
.../nifi/processors/kafka/pubsub/TestUtils.java | 45 ----
.../kafka/pubsub/util/MockRecordWriter.java | 2 +-
.../processors/kafka/test/EmbeddedKafka.java | 226 -------------------
.../org/apache/nifi/json/WriteJsonResult.java | 9 +-
15 files changed, 74 insertions(+), 611 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
index d9be673..5f94679 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.avro.Schema;
@@ -29,6 +31,12 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema;
public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
+ private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> eldest) {
+ return size() > 10;
+ }
+ };
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@@ -36,8 +44,22 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
@Override
public Map<String, String> getAttributes(final RecordSchema schema) {
- final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
- final String schemaText = avroSchema.toString();
+ // First, check if schema has the Avro Text available already.
+ final Optional<String> schemaFormat = schema.getSchemaFormat();
+ if (schemaFormat.isPresent() && AvroTypeUtil.AVRO_SCHEMA_FORMAT.equals(schemaFormat.get())) {
+ final Optional<String> schemaText = schema.getSchemaText();
+ if (schemaText.isPresent()) {
+ return Collections.singletonMap("avro.schema", schemaText.get());
+ }
+ }
+
+ String schemaText = avroSchemaTextCache.get(schema);
+ if (schemaText == null) {
+ final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
+ schemaText = avroSchema.toString();
+ avroSchemaTextCache.put(schema, schemaText);
+ }
+
return Collections.singletonMap("avro.schema", schemaText);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 72c90d2..2e25129 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@@ -164,8 +166,10 @@ public class PublisherLease implements Closeable {
recordCount++;
baos.reset();
+ Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
- writer.write(record);
+ final WriteResult writeResult = writer.write(record);
+ additionalAttributes = writeResult.getAttributes();
writer.flush();
}
@@ -173,7 +177,7 @@ public class PublisherLease implements Closeable {
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
- publish(flowFile, messageKey, messageContent, topic, tracker);
+ publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
@@ -195,7 +199,7 @@ public class PublisherLease implements Closeable {
}
}
- private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
+ private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
if (attributeNameRegex == null) {
return;
}
@@ -206,11 +210,23 @@ public class PublisherLease implements Closeable {
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
}
}
+
+ for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
+ if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+ headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
+ }
+ }
}
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+ publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
+ }
+
+ protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
+ final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
- addHeaders(flowFile, record);
+ addHeaders(flowFile, additionalAttributes, record);
producer.send(record, new Callback() {
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
index b7d4abd..9a209d5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
@@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_11 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_11.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index d2b52dd..3ab7abb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -38,12 +38,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
-import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -270,13 +270,12 @@ public class TestPublisherLease {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();
- final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");
-
final String topic = "unit-test";
final String keyField = "person_id";
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+ Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index 819e3b7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
- public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
- field.set(instance, newValue);
- }
-
- static Unsafe getUnsafe() {
- try {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- return (Unsafe) f.get(null);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 90a909d..0eb8606 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
@Override
public WriteResult write(Record record) throws IOException {
- return null;
+ return WriteResult.of(1, Collections.emptyMap());
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
deleted file mode 100644
index a720b11..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
-/**
- * Embedded Kafka server, primarily to be used for testing.
- */
-public class EmbeddedKafka {
-
- private final KafkaServerStartable kafkaServer;
-
- private final Properties zookeeperConfig;
-
- private final Properties kafkaConfig;
-
- private final ZooKeeperServer zkServer;
-
- private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
-
- private final int kafkaPort;
-
- private final int zookeeperPort;
-
- private boolean started;
-
- /**
- * Will create instance of the embedded Kafka server. Kafka and Zookeeper
- * configuration properties will be loaded from 'server.properties' and
- * 'zookeeper.properties' located at the root of the classpath.
- */
- public EmbeddedKafka() {
- this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
- }
-
- /**
- * Will create instance of the embedded Kafka server.
- *
- * @param kafkaConfig
- * Kafka configuration properties
- * @param zookeeperConfig
- * Zookeeper configuration properties
- */
- public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
- this.cleanupKafkaWorkDir();
- this.zookeeperConfig = zookeeperConfig;
- this.kafkaConfig = kafkaConfig;
- this.kafkaPort = this.availablePort();
- this.zookeeperPort = this.availablePort();
-
- this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
- this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
- this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
- this.zkServer = new ZooKeeperServer();
- this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
- }
-
- /**
- *
- * @return port for Kafka server
- */
- public int getKafkaPort() {
- if (!this.started) {
- throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
- }
- return this.kafkaPort;
- }
-
- /**
- *
- * @return port for Zookeeper server
- */
- public int getZookeeperPort() {
- if (!this.started) {
- throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
- }
- return this.zookeeperPort;
- }
-
- /**
- * Will start embedded Kafka server. Its data directories will be created
- * at 'kafka-tmp' directory relative to the working directory of the current
- * runtime. The data directories will be deleted upon JVM exit.
- *
- */
- public void start() {
- if (!this.started) {
- logger.info("Starting Zookeeper server");
- this.startZookeeper();
-
- logger.info("Starting Kafka server");
- this.kafkaServer.startup();
-
- logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
- + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
- this.started = true;
- }
- }
-
- /**
- * Will stop embedded Kafka server, cleaning up all working directories.
- */
- public void stop() {
- if (this.started) {
- logger.info("Shutting down Kafka server");
- this.kafkaServer.shutdown();
- this.kafkaServer.awaitShutdown();
- logger.info("Shutting down Zookeeper server");
- this.shutdownZookeeper();
- logger.info("Embedded Kafka is shut down.");
- this.cleanupKafkaWorkDir();
- this.started = false;
- }
- }
-
- /**
- *
- */
- private void cleanupKafkaWorkDir() {
- File kafkaTmp = new File("target/kafka-tmp");
- try {
- FileUtils.deleteDirectory(kafkaTmp);
- } catch (Exception e) {
- logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
- }
- }
-
- /**
- * Will start Zookeeper server via {@link ServerCnxnFactory}
- */
- private void startZookeeper() {
- QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
- try {
- quorumConfiguration.parseProperties(this.zookeeperConfig);
-
- ServerConfig configuration = new ServerConfig();
- configuration.readFrom(quorumConfiguration);
-
- FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
-
- zkServer.setTxnLogFactory(txnLog);
- zkServer.setTickTime(configuration.getTickTime());
- zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
- zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
- ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
- zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
- configuration.getMaxClientCnxns());
- zookeeperConnectionFactory.startup(zkServer);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to start Zookeeper server", e);
- }
- }
-
- /**
- * Will shut down Zookeeper server.
- */
- private void shutdownZookeeper() {
- zkServer.shutdown();
- }
-
- /**
- * Will load {@link Properties} from properties file discovered at the
- * provided path relative to the root of the classpath.
- */
- private static Properties loadPropertiesFromClasspath(String path) {
- try {
- Properties kafkaProperties = new Properties();
- kafkaProperties.load(Class.class.getResourceAsStream(path));
- return kafkaProperties;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- /**
- * Will determine the available port used by Kafka/Zookeeper servers.
- */
- private int availablePort() {
- ServerSocket s = null;
- try {
- s = new ServerSocket(0);
- s.setReuseAddress(true);
- return s.getLocalPort();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to discover available port.", e);
- } finally {
- try {
- s.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index 819e3b7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
- public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
- field.set(instance, newValue);
- }
-
- static Unsafe getUnsafe() {
- try {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- return (Unsafe) f.get(null);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 2b1cfe2..1c241a4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@@ -163,8 +165,10 @@ public class PublisherLease implements Closeable {
recordCount++;
baos.reset();
+ Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
- writer.write(record);
+ final WriteResult writeResult = writer.write(record);
+ additionalAttributes = writeResult.getAttributes();
writer.flush();
}
@@ -172,7 +176,7 @@ public class PublisherLease implements Closeable {
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
- publish(flowFile, messageKey, messageContent, topic, tracker);
+ publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
@@ -194,7 +198,7 @@ public class PublisherLease implements Closeable {
}
}
- private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
+ private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
if (attributeNameRegex == null) {
return;
}
@@ -205,11 +209,23 @@ public class PublisherLease implements Closeable {
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
}
}
+
+ for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
+ if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+ headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
+ }
+ }
}
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+ publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
+ }
+
+ protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
+ final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
- addHeaders(flowFile, record);
+ addHeaders(flowFile, additionalAttributes, record);
producer.send(record, new Callback() {
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
index 45439cc..abadc89 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
@@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index b2e1b0e..2fbf539 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -43,6 +43,7 @@ import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -274,6 +275,7 @@ public class TestPublisherLease {
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+ Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index 819e3b7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
- public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
- field.set(instance, newValue);
- }
-
- static Unsafe getUnsafe() {
- try {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- return (Unsafe) f.get(null);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 90a909d..0eb8606 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
@Override
public WriteResult write(Record record) throws IOException {
- return null;
+ return WriteResult.of(1, Collections.emptyMap());
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
deleted file mode 100644
index a720b11..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
-/**
- * Embedded Kafka server, primarily to be used for testing.
- */
-public class EmbeddedKafka {
-
- private final KafkaServerStartable kafkaServer;
-
- private final Properties zookeeperConfig;
-
- private final Properties kafkaConfig;
-
- private final ZooKeeperServer zkServer;
-
- private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
-
- private final int kafkaPort;
-
- private final int zookeeperPort;
-
- private boolean started;
-
- /**
- * Will create instance of the embedded Kafka server. Kafka and Zookeeper
- * configuration properties will be loaded from 'server.properties' and
- * 'zookeeper.properties' located at the root of the classpath.
- */
- public EmbeddedKafka() {
- this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
- }
-
- /**
- * Will create instance of the embedded Kafka server.
- *
- * @param kafkaConfig
- * Kafka configuration properties
- * @param zookeeperConfig
- * Zookeeper configuration properties
- */
- public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
- this.cleanupKafkaWorkDir();
- this.zookeeperConfig = zookeeperConfig;
- this.kafkaConfig = kafkaConfig;
- this.kafkaPort = this.availablePort();
- this.zookeeperPort = this.availablePort();
-
- this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
- this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
- this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
- this.zkServer = new ZooKeeperServer();
- this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
- }
-
- /**
- *
- * @return port for Kafka server
- */
- public int getKafkaPort() {
- if (!this.started) {
- throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
- }
- return this.kafkaPort;
- }
-
- /**
- *
- * @return port for Zookeeper server
- */
- public int getZookeeperPort() {
- if (!this.started) {
- throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
- }
- return this.zookeeperPort;
- }
-
- /**
- * Will start embedded Kafka server. Its data directories will be created
- * at 'kafka-tmp' directory relative to the working directory of the current
- * runtime. The data directories will be deleted upon JVM exit.
- *
- */
- public void start() {
- if (!this.started) {
- logger.info("Starting Zookeeper server");
- this.startZookeeper();
-
- logger.info("Starting Kafka server");
- this.kafkaServer.startup();
-
- logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
- + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
- this.started = true;
- }
- }
-
- /**
- * Will stop embedded Kafka server, cleaning up all working directories.
- */
- public void stop() {
- if (this.started) {
- logger.info("Shutting down Kafka server");
- this.kafkaServer.shutdown();
- this.kafkaServer.awaitShutdown();
- logger.info("Shutting down Zookeeper server");
- this.shutdownZookeeper();
- logger.info("Embedded Kafka is shut down.");
- this.cleanupKafkaWorkDir();
- this.started = false;
- }
- }
-
- /**
- *
- */
- private void cleanupKafkaWorkDir() {
- File kafkaTmp = new File("target/kafka-tmp");
- try {
- FileUtils.deleteDirectory(kafkaTmp);
- } catch (Exception e) {
- logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
- }
- }
-
- /**
- * Will start Zookeeper server via {@link ServerCnxnFactory}
- */
- private void startZookeeper() {
- QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
- try {
- quorumConfiguration.parseProperties(this.zookeeperConfig);
-
- ServerConfig configuration = new ServerConfig();
- configuration.readFrom(quorumConfiguration);
-
- FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
-
- zkServer.setTxnLogFactory(txnLog);
- zkServer.setTickTime(configuration.getTickTime());
- zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
- zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
- ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
- zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
- configuration.getMaxClientCnxns());
- zookeeperConnectionFactory.startup(zkServer);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to start Zookeeper server", e);
- }
- }
-
- /**
- * Will shut down Zookeeper server.
- */
- private void shutdownZookeeper() {
- zkServer.shutdown();
- }
-
- /**
- * Will load {@link Properties} from properties file discovered at the
- * provided path relative to the root of the classpath.
- */
- private static Properties loadPropertiesFromClasspath(String path) {
- try {
- Properties kafkaProperties = new Properties();
- kafkaProperties.load(Class.class.getResourceAsStream(path));
- return kafkaProperties;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- /**
- * Will determine the available port used by Kafka/Zookeeper servers.
- */
- private int availablePort() {
- ServerSocket s = null;
- try {
- s = new ServerSocket(0);
- s.setReuseAddress(true);
- return s.getLocalPort();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to discover available port.", e);
- } finally {
- try {
- s.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index fc84181..41a72c7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.text.DateFormat;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -118,30 +117,26 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
- boolean firstRecord = false;
if (!isActiveRecordSet()) {
generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream());
- firstRecord = true;
}
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true);
- return firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
+ return schemaAccess.getAttributes(recordSchema);
}
@Override
public WriteResult writeRawRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
- boolean firstRecord = false;
if (!isActiveRecordSet()) {
generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream());
- firstRecord = true;
}
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false);
- final Map<String, String> attributes = firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
+ final Map<String, String> attributes = schemaAccess.getAttributes(recordSchema);
return WriteResult.of(incrementRecordCount(), attributes);
}