You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/09 04:33:52 UTC
[GitHub] bdesert closed pull request #3250: NIFI-5937 use
processor-configured encoding instead of the system default
bdesert closed pull request #3250: NIFI-5937 use processor-configured encoding instead of the system default
URL: https://github.com/apache/nifi/pull/3250
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index ac36604981..52de42442a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -73,6 +73,7 @@
import java.io.InputStream;
import java.math.BigInteger;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -198,6 +199,7 @@
descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX);
descriptors.add(TYPE);
+ descriptors.add(CHARSET);
descriptors.add(INDEX_OP);
descriptors.add(SUPPRESS_NULLS);
@@ -313,6 +315,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
final StringBuilder sb = new StringBuilder();
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
int recordCount = 0;
try (final InputStream in = session.read(flowFile);
@@ -345,7 +348,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
writeRecord(record, record.getSchema(), generator);
generator.flush();
generator.close();
- json.append(out.toString());
+ json.append(out.toString(charset.name()));
buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
recordCount++;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 862e177068..2cc16c1aba 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -16,18 +16,15 @@
*/
package org.apache.nifi.processors.elasticsearch;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -42,16 +39,21 @@
import org.junit.Ignore;
import org.junit.Test;
-import okhttp3.Call;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Protocol;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
-public class TestPutElasticsearchHttpRecord {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+public class TestPutElasticsearchHttpRecord {
private TestRunner runner;
@After
@@ -61,7 +63,25 @@ public void teardown() {
@Test
public void testPutElasticSearchOnTriggerIndex() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+ PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+ processor.setRecordChecks(record -> {
+ assertEquals(1, record.get("id"));
+ assertEquals("reç1", record.get("name"));
+ assertEquals(101, record.get("code"));
+ }, record -> {
+ assertEquals(2, record.get("id"));
+ assertEquals("ræc2", record.get("name"));
+ assertEquals(102, record.get("code"));
+ }, record -> {
+ assertEquals(3, record.get("id"));
+ assertEquals("rèc3", record.get("name"));
+ assertEquals(103, record.get("code"));
+ }, record -> {
+ assertEquals(4, record.get("id"));
+ assertEquals("rëc4", record.get("name"));
+ assertEquals(104, record.get("code"));
+ });
+ runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@@ -368,6 +388,7 @@ public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
int statusCode = 200;
String statusMessage = "OK";
String expectedUrl = null;
+ Consumer<Map>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures;
@@ -382,6 +403,11 @@ void setExpectedUrl(String url) {
expectedUrl = url;
}
+ @SafeVarargs
+ final void setRecordChecks(Consumer<Map>... checks) {
+ recordChecks = checks;
+ }
+
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
@@ -391,6 +417,24 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE
if (statusCode != -1) {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
+ if (recordChecks != null) {
+ final ObjectMapper mapper = new ObjectMapper();
+ Buffer sink = new Buffer();
+ realRequest.body().writeTo(sink);
+ String line;
+ int recordIndex = 0;
+ boolean content = false;
+ while ((line = sink.readUtf8Line()) != null) {
+ if (content) {
+ content = false;
+ if (recordIndex < recordChecks.length) {
+ recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class));
+ }
+ } else {
+ content = true;
+ }
+ }
+ }
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
sb.append(responseHasFailures);
sb.append("\", \"items\": [");
@@ -521,9 +565,9 @@ private void generateTestData() throws IOException {
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
- parser.addRecord(1, "rec1", 101);
- parser.addRecord(2, "rec2", 102);
- parser.addRecord(3, "rec3", 103);
- parser.addRecord(4, "rec4", 104);
+ parser.addRecord(1, "reç1", 101);
+ parser.addRecord(2, "ræc2", 102);
+ parser.addRecord(3, "rèc3", 103);
+ parser.addRecord(4, "rëc4", 104);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services