You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/09 04:33:52 UTC

[GitHub] bdesert closed pull request #3250: NIFI-5937 use processor-configured encoding instead of the system default

bdesert closed pull request #3250: NIFI-5937 use processor-configured encoding instead of the system default
URL: https://github.com/apache/nifi/pull/3250
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index ac36604981..52de42442a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -73,6 +73,7 @@
 import java.io.InputStream;
 import java.math.BigInteger;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -198,6 +199,7 @@
         descriptors.add(ID_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
+        descriptors.add(CHARSET);
         descriptors.add(INDEX_OP);
         descriptors.add(SUPPRESS_NULLS);
 
@@ -313,6 +315,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
         final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
         final StringBuilder sb = new StringBuilder();
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         int recordCount = 0;
         try (final InputStream in = session.read(flowFile);
@@ -345,7 +348,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 writeRecord(record, record.getSchema(), generator);
                 generator.flush();
                 generator.close();
-                json.append(out.toString());
+                json.append(out.toString(charset.name()));
 
                 buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
                 recordCount++;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 862e177068..2cc16c1aba 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -16,18 +16,15 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -42,16 +39,21 @@
 import org.junit.Ignore;
 import org.junit.Test;
 
-import okhttp3.Call;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Protocol;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
 
-public class TestPutElasticsearchHttpRecord {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+public class TestPutElasticsearchHttpRecord {
     private TestRunner runner;
 
     @After
@@ -61,7 +63,25 @@ public void teardown() {
 
     @Test
     public void testPutElasticSearchOnTriggerIndex() throws IOException {
-        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        processor.setRecordChecks(record -> {
+            assertEquals(1, record.get("id"));
+            assertEquals("reç1", record.get("name"));
+            assertEquals(101, record.get("code"));
+        }, record -> {
+            assertEquals(2, record.get("id"));
+            assertEquals("ræc2", record.get("name"));
+            assertEquals(102, record.get("code"));
+        }, record -> {
+            assertEquals(3, record.get("id"));
+            assertEquals("rèc3", record.get("name"));
+            assertEquals(103, record.get("code"));
+        }, record -> {
+            assertEquals(4, record.get("id"));
+            assertEquals("rëc4", record.get("name"));
+            assertEquals(104, record.get("code"));
+        });
+        runner = TestRunners.newTestRunner(processor); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
@@ -368,6 +388,7 @@ public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
         int statusCode = 200;
         String statusMessage = "OK";
         String expectedUrl = null;
+        Consumer<Map>[] recordChecks;
 
         PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
             this.responseHasFailures = responseHasFailures;
@@ -382,6 +403,11 @@ void setExpectedUrl(String url) {
             expectedUrl = url;
         }
 
+        @SafeVarargs
+        final void setRecordChecks(Consumer<Map>... checks) {
+            recordChecks = checks;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -391,6 +417,24 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE
                 if (statusCode != -1) {
                     Request realRequest = (Request) invocationOnMock.getArguments()[0];
                     assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
+                    if (recordChecks != null) {
+                        final ObjectMapper mapper = new ObjectMapper();
+                        Buffer sink = new Buffer();
+                        realRequest.body().writeTo(sink);
+                        String line;
+                        int recordIndex = 0;
+                        boolean content = false;
+                        while ((line = sink.readUtf8Line()) != null) {
+                            if (content) {
+                                content = false;
+                                if (recordIndex < recordChecks.length) {
+                                    recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class));
+                                }
+                            } else {
+                                content = true;
+                            }
+                        }
+                    }
                     StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
                     sb.append(responseHasFailures);
                     sb.append("\", \"items\": [");
@@ -521,9 +565,9 @@ private void generateTestData() throws IOException {
         parser.addSchemaField("name", RecordFieldType.STRING);
         parser.addSchemaField("code", RecordFieldType.INT);
 
-        parser.addRecord(1, "rec1", 101);
-        parser.addRecord(2, "rec2", 102);
-        parser.addRecord(3, "rec3", 103);
-        parser.addRecord(4, "rec4", 104);
+        parser.addRecord(1, "reç1", 101);
+        parser.addRecord(2, "ræc2", 102);
+        parser.addRecord(3, "rèc3", 103);
+        parser.addRecord(4, "rëc4", 104);
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services