You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/09/22 13:37:36 UTC
[pulsar] branch master updated: Fixed KCA Sink handling of Json and
Avro;
support for kafka connectors that overload task.preCommit() directly
(#11905)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebad71a Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly (#11905)
ebad71a is described below
commit ebad71aed7991e4494f81cf1980408f5bbff980a
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Sep 22 06:36:11 2021 -0700
Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly (#11905)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 16 ++-
.../io/kafka/connect/schema/KafkaConnectData.java | 160 +++++++++++++++++++++
.../io/kafka/connect/KafkaConnectSinkTest.java | 98 ++++++++++---
.../kafka/connect/SchemaedFileStreamSinkTask.java | 16 ++-
4 files changed, 268 insertions(+), 22 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 8b60f54..268105c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import java.util.List;
@@ -199,9 +200,14 @@ public class KafkaConnectSink implements Sink<GenericObject> {
final Record<GenericObject> lastNotFlushed = pendingFlushQueue.getLast();
try {
Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets();
- task.flush(currentOffsets);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = task.preCommit(currentOffsets);
+ if (committedOffsets.isEmpty()) {
+ log.info("Task returned empty committedOffsets map; skipping flush; task will retry later");
+ return;
+ }
taskContext.flushOffsets(currentOffsets);
ackUntil(lastNotFlushed, Record::ack);
+ log.info("Flush succeeded");
} catch (Throwable t) {
log.error("error flushing pending records", t);
ackUntil(lastNotFlushed, Record::fail);
@@ -222,7 +228,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
}
@SuppressWarnings("rawtypes")
- private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
+ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
final String topic = sourceRecord.getTopicName().orElse(topicName);
final Object key;
@@ -237,10 +243,10 @@ public class KafkaConnectSink implements Sink<GenericObject> {
&& sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
- key = kv.getKey();
- value = kv.getValue();
keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
+ key = kv.getKey();
+ value = kv.getValue();
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
@@ -250,7 +256,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
keySchema = Schema.STRING_SCHEMA;
}
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
- value = sourceRecord.getValue().getNativeObject();
+ value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
}
long offset = sourceRecord.getRecordSequence()
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
new file mode 100644
index 0000000..b649a9b
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericData;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConnectData {
+ public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
+ if (kafkaSchema == null) {
+ return nativeObject;
+ }
+
+ if (nativeObject == null) {
+ return defaultOrThrow(kafkaSchema);
+ }
+
+ if (nativeObject instanceof JsonNode) {
+ JsonNode node = (JsonNode) nativeObject;
+ return jsonAsConnectData(node, kafkaSchema);
+ } else if (nativeObject instanceof GenericData.Record) {
+ GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+ return avroAsConnectData(avroRecord, kafkaSchema);
+ }
+
+ return nativeObject;
+ }
+
+ static Object avroAsConnectData(GenericData.Record avroRecord, Schema kafkaSchema) {
+ if (kafkaSchema == null) {
+ if (avroRecord == null) {
+ return null;
+ }
+ throw new DataException("Don't know how to convert " + avroRecord + " to Connect data (schema is null).");
+ }
+
+ Struct struct = new Struct(kafkaSchema);
+ for (Field field : kafkaSchema.fields()) {
+ struct.put(field, getKafkaConnectData(avroRecord.get(field.name()), field.schema()));
+ }
+ return struct;
+ }
+
+ // with some help of
+ // https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+ static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
+ if (kafkaSchema == null) {
+ if (jsonNode == null || jsonNode.isNull()) {
+ return null;
+ }
+ switch (jsonNode.getNodeType()) {
+ case BINARY:
+ try {
+ return jsonNode.binaryValue();
+ } catch (IOException e) {
+ throw new DataException("Cannot get binary value for " + jsonNode);
+ }
+ case BOOLEAN:
+ return jsonNode.booleanValue();
+ case NUMBER:
+ jsonNode.doubleValue();
+ case STRING:
+ jsonNode.textValue();
+ default:
+ throw new DataException("Don't know how to convert " + jsonNode +
+ " to Connect data (schema is null).");
+ }
+ }
+
+ if (jsonNode == null || jsonNode.isNull()) {
+ return defaultOrThrow(kafkaSchema);
+ }
+
+ switch (kafkaSchema.type()) {
+ case INT8:
+ return (byte)jsonNode.shortValue();
+ case INT16:
+ return jsonNode.shortValue();
+ case INT32:
+ return jsonNode.intValue();
+ case INT64:
+ return jsonNode.longValue();
+ case FLOAT32:
+ return jsonNode.floatValue();
+ case FLOAT64:
+ return jsonNode.doubleValue();
+ case BOOLEAN:
+ return jsonNode.booleanValue();
+ case STRING:
+ return jsonNode.textValue();
+ case BYTES:
+ try {
+ return jsonNode.binaryValue();
+ } catch (IOException e) {
+ throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
+ }
+ case ARRAY:
+ List<Object> list = new ArrayList<>();
+ Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
+ for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {
+ list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
+ }
+ return list;
+ case MAP:
+ Map<String, Object> map = new HashMap<>();
+ for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
+ Map.Entry<String, JsonNode> elem = it.next();
+ map.put(elem.getKey(), jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
+ }
+ return map;
+ case STRUCT:
+ Struct struct = new Struct(kafkaSchema);
+ for (Field field: kafkaSchema.fields()) {
+ struct.put(field, jsonAsConnectData(jsonNode.get(field.name()), field.schema()));
+ }
+ return struct;
+ default:
+ throw new DataException("Unknown schema type " + kafkaSchema.type());
+ }
+ }
+
+ private static Object defaultOrThrow(Schema kafkaSchema) {
+ if (kafkaSchema.defaultValue() != null) {
+ return kafkaSchema.defaultValue(); // any logical type conversions should already have been applied
+ }
+ if (kafkaSchema.isOptional()) {
+ return null;
+ }
+ throw new DataException("Invalid null value for required " + kafkaSchema.type() + " field");
+ }
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 92d2a25..b561e4b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -20,9 +20,13 @@
package org.apache.pulsar.io.kafka.connect;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -34,13 +38,17 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -124,7 +132,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
final GenericRecord rec = getGenericRecord("value", Schema.STRING);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn(rec);
- when(msg.getMessageId()).thenReturn(new MessageIdImpl(0, 0, 0));
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
@@ -230,11 +238,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
}
- private void recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception {
- recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema);
+ private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception {
+ return recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema);
}
- private void recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema,
+ private SinkRecord recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema,
Object expected, String expectedSchema) throws Exception {
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
@@ -246,7 +254,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
when(msg.getValue()).thenReturn(rec);
when(msg.getKey()).thenReturn("key");
when(msg.hasKey()).thenReturn(true);
- when(msg.getMessageId()).thenReturn(new MessageIdImpl(0, 0, 0));
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
@@ -272,6 +280,9 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
assertEquals(result.get("value"), expected);
assertEquals(result.get("keySchema"), expectedKeySchema);
assertEquals(result.get("valueSchema"), expectedSchema);
+
+ SinkRecord sinkRecord = sink.toSinkRecord(record);
+ return sinkRecord;
}
private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -289,50 +300,69 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void bytesRecordSchemaTest() throws Exception {
- recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), Schema.BYTES, "val", "BYTES");
+ byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
+ SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES");
+ byte[] out = (byte[]) sinkRecord.value();
+ Assert.assertEquals(out, in);
}
@Test
public void stringRecordSchemaTest() throws Exception {
- recordSchemaTest("val", Schema.STRING, "val", "STRING");
+ SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING");
+ String out = (String) sinkRecord.value();
+ Assert.assertEquals(out, "val");
}
@Test
public void booleanRecordSchemaTest() throws Exception {
- recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
+ SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
+ boolean out = (boolean) sinkRecord.value();
+ Assert.assertEquals(out, true);
}
@Test
public void byteRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
- recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
+ SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
+ byte out = (byte) sinkRecord.value();
+ Assert.assertEquals(out, 1);
}
@Test
public void shortRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
- recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
+ SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
+ short out = (short) sinkRecord.value();
+ Assert.assertEquals(out, 1);
}
@Test
public void integerRecordSchemaTest() throws Exception {
- recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
+ SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
+ int out = (int) sinkRecord.value();
+ Assert.assertEquals(out, Integer.MAX_VALUE);
}
@Test
public void longRecordSchemaTest() throws Exception {
- recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
+ SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
+ long out = (long) sinkRecord.value();
+ Assert.assertEquals(out, Long.MAX_VALUE);
}
@Test
public void floatRecordSchemaTest() throws Exception {
// 1.0d is coming back from ObjectMapper
- recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
+ SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
+ float out = (float) sinkRecord.value();
+ Assert.assertEquals(out, 1.0d);
}
@Test
public void doubleRecordSchemaTest() throws Exception {
- recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
+ SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
+ double out = (double) sinkRecord.value();
+ Assert.assertEquals(out, Double.MAX_VALUE);
}
@Test
@@ -347,13 +377,45 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
obj.setField2("test");
obj.setField3(100L);
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonNode = mapper.valueToTree(obj);
+
Map<String, Object> expected = new LinkedHashMap<>();
expected.put("field1", 10);
expected.put("field2", "test");
// integer is coming back from ObjectMapper
expected.put("field3", 100);
- recordSchemaTest(obj, jsonSchema, expected, "STRUCT");
+ SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
+
+ Struct out = (Struct) sinkRecord.value();
+ Assert.assertEquals((int)out.get("field1"), 10);
+ Assert.assertEquals((String)out.get("field2"), "test");
+ Assert.assertEquals((long)out.get("field3"), 100L);
+ }
+
+ @Test
+ public void avroSchemaTest() throws Exception {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+ obj.put("field1", 10);
+ obj.put("field2", "test");
+ obj.put("field3", 100L);
+
+ Map<String, Object> expected = new LinkedHashMap<>();
+ expected.put("field1", 10);
+ expected.put("field2", "test");
+ // integer is coming back from ObjectMapper
+ expected.put("field3", 100);
+
+ SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
+
+ Struct out = (Struct) sinkRecord.value();
+ Assert.assertEquals((int)out.get("field1"), 10);
+ Assert.assertEquals((String)out.get("field2"), "test");
+ Assert.assertEquals((long)out.get("field3"), 100L);
}
@Test
@@ -390,7 +452,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void KeyValueSchemaTest() throws Exception {
KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
- recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
+ SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
+ String val = (String) sinkRecord.value();
+ Assert.assertEquals(val, "value");
+ int key = (int) sinkRecord.key();
+ Assert.assertEquals(key, 11);
}
@Test
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
index 91c0790..9821a58 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.io.kafka.connect;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.file.FileStreamSinkTask;
import org.apache.kafka.connect.sink.SinkRecord;
import org.testng.collections.Maps;
@@ -51,7 +53,19 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
recOut.put("keySchema", record.keySchema().type().toString());
recOut.put("valueSchema", record.valueSchema().type().toString());
recOut.put("key", record.key());
- recOut.put("value", val);
+ if (val instanceof Struct) {
+ Map<String, Object> map = Maps.newHashMap();
+ Struct struct = (Struct)val;
+
+ // no recursion needed for tests
+ for (Field f: struct.schema().fields()) {
+ map.put(f.name(), struct.get(f));
+ }
+
+ recOut.put("value", map);
+ } else {
+ recOut.put("value", val);
+ }
ObjectMapper om = new ObjectMapper();
try {