You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:49 UTC
[34/50] [abbrv] beam git commit: [BEAM-2410] Remove TransportClient
from ElasticSearchIO to decouple IO and ES server versions
[BEAM-2410] Remove TransportClient from ElasticSearchIO to decouple IO and ES server versions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7caea7a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7caea7a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7caea7a8
Branch: refs/heads/gearpump-runner
Commit: 7caea7a845eff072a647baf69b9b004db4523652
Parents: e980ae9
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Jun 5 16:21:58 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Jun 9 07:31:06 2017 +0200
----------------------------------------------------------------------
.../sdk/io/common/IOTestPipelineOptions.java | 6 +-
.../sdk/io/elasticsearch/ElasticsearchIO.java | 4 +-
.../elasticsearch/ElasticSearchIOTestUtils.java | 81 +++++++++++---------
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 14 ++--
.../io/elasticsearch/ElasticsearchIOTest.java | 36 +++++----
.../elasticsearch/ElasticsearchTestDataSet.java | 37 ++++-----
6 files changed, 87 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 387fd22..25ab929 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -71,11 +71,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
Integer getElasticsearchHttpPort();
void setElasticsearchHttpPort(Integer value);
- @Description("Tcp port for elasticsearch server")
- @Default.Integer(9300)
- Integer getElasticsearchTcpPort();
- void setElasticsearchTcpPort(Integer value);
-
+ /* Cassandra */
@Description("Host for Cassandra server (host name/ip address)")
@Default.String("cassandra-host")
String getCassandraHost();
http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index f6ceef2..e3965dc 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -139,7 +139,7 @@ public class ElasticsearchIO {
private static final ObjectMapper mapper = new ObjectMapper();
- private static JsonNode parseResponse(Response response) throws IOException {
+ static JsonNode parseResponse(Response response) throws IOException {
return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
}
@@ -264,7 +264,7 @@ public class ElasticsearchIO {
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
- private RestClient createClient() throws MalformedURLException {
+ RestClient createClient() throws MalformedURLException {
HttpHost[] hosts = new HttpHost[getAddresses().size()];
int i = 0;
for (String address : getAddresses()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index b0d161f..203963d 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -17,19 +17,17 @@
*/
package org.apache.beam.sdk.io.elasticsearch;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.index.IndexNotFoundException;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
/** Test utilities to use with {@link ElasticsearchIO}. */
class ElasticSearchIOTestUtils {
@@ -41,57 +39,68 @@ class ElasticSearchIOTestUtils {
}
/** Deletes the given index synchronously. */
- static void deleteIndex(String index, Client client) throws Exception {
- IndicesAdminClient indices = client.admin().indices();
- IndicesExistsResponse indicesExistsResponse =
- indices.exists(new IndicesExistsRequest(index)).get();
- if (indicesExistsResponse.isExists()) {
- indices.prepareClose(index).get();
- indices.delete(Requests.deleteIndexRequest(index)).get();
+ static void deleteIndex(String index, RestClient restClient) throws IOException {
+ try {
+ restClient.performRequest("DELETE", String.format("/%s", index), new BasicHeader("", ""));
+ } catch (IOException e) {
+ // it is fine to ignore this expression as deleteIndex occurs in @before,
+ // so when the first tests is run, the index does not exist yet
+ if (!e.getMessage().contains("index_not_found_exception")){
+ throw e;
+ }
}
}
/** Inserts the given number of test documents into Elasticsearch. */
- static void insertTestDocuments(String index, String type, long numDocs, Client client)
- throws Exception {
- final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true);
+ static void insertTestDocuments(String index, String type, long numDocs, RestClient restClient)
+ throws IOException {
List<String> data =
ElasticSearchIOTestUtils.createDocuments(
numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+ StringBuilder bulkRequest = new StringBuilder();
for (String document : data) {
- bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document));
+ bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document));
}
- final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
- if (bulkResponse.hasFailures()) {
+ String endPoint = String.format("/%s/%s/_bulk", index, type);
+ HttpEntity requestBody =
+ new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
+ Response response = restClient.performRequest("POST", endPoint,
+ Collections.singletonMap("refresh", "true"), requestBody,
+ new BasicHeader("", ""));
+ JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+ boolean errors = searchResult.path("errors").asBoolean();
+ if (errors){
throw new IOException(
- String.format(
- "Cannot insert test documents in index %s : %s",
- index, bulkResponse.buildFailureMessage()));
+ String.format("Failed to insert test documents in index %s", index));
}
}
/**
- * Forces an upgrade of the given index to make recently inserted documents available for search.
+ * Forces a refresh of the given index to make recently inserted documents available for search.
*
* @return The number of docs in the index
*/
- static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) {
+ static long refreshIndexAndGetCurrentNumDocs(String index, String type, RestClient restClient)
+ throws IOException {
+ long result = 0;
try {
- client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet();
- SearchResponse response =
- client.prepareSearch(index).setTypes(type).execute().actionGet(5000);
- return response.getHits().getTotalHits();
+ String endPoint = String.format("/%s/_refresh", index);
+ restClient.performRequest("POST", endPoint, new BasicHeader("", ""));
+
+ endPoint = String.format("/%s/%s/_search", index, type);
+ Response response = restClient.performRequest("GET", endPoint, new BasicHeader("", ""));
+ JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+ result = searchResult.path("hits").path("total").asLong();
+ } catch (IOException e) {
// it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
// we call upgrade before any doc have been written
// (when there are fewer docs processed than batchSize).
// In that cases index/type has not been created (created upon first doc insertion)
- } catch (IndexNotFoundException e) {
- } catch (java.lang.IllegalArgumentException e) {
- if (!e.getMessage().contains("No search type")) {
+ if (!e.getMessage().contains("index_not_found_exception")){
throw e;
}
}
- return 0;
+ return result;
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 2d6393a..7c37e87 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
*/
public class ElasticsearchIOIT {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOIT.class);
- private static TransportClient client;
+ private static RestClient restClient;
private static IOTestPipelineOptions options;
private static ElasticsearchIO.ConnectionConfiguration readConnectionConfiguration;
@Rule public TestPipeline pipeline = TestPipeline.create();
@@ -66,16 +66,16 @@ public class ElasticsearchIOIT {
public static void beforeClass() throws Exception {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
- client = ElasticsearchTestDataSet.getClient(options);
readConnectionConfiguration =
ElasticsearchTestDataSet.getConnectionConfiguration(
options, ElasticsearchTestDataSet.ReadOrWrite.READ);
+ restClient = readConnectionConfiguration.createClient();
}
@AfterClass
public static void afterClass() throws Exception {
- ElasticsearchTestDataSet.deleteIndex(client, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
- client.close();
+ ElasticsearchTestDataSet.deleteIndex(restClient, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
+ restClient.close();
}
@Test
@@ -128,8 +128,8 @@ public class ElasticsearchIOIT {
pipeline.run();
long currentNumDocs =
- ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
- ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, client);
+ ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
+ ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, restClient);
assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 260af79..b349a29 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.PCollection;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
import org.hamcrest.CustomMatcher;
import org.junit.AfterClass;
import org.junit.Before;
@@ -74,9 +74,10 @@ public class ElasticsearchIOTest implements Serializable {
private static final long BATCH_SIZE_BYTES = 2048L;
private static Node node;
+ private static RestClient restClient;
private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
- @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@Rule
public TestPipeline pipeline = TestPipeline.create();
@@ -91,8 +92,8 @@ public class ElasticsearchIOTest implements Serializable {
.put("cluster.name", "beam")
.put("http.enabled", "true")
.put("node.data", "true")
- .put("path.data", folder.getRoot().getPath())
- .put("path.home", folder.getRoot().getPath())
+ .put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
+ .put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
.put("node.name", "beam")
.put("network.host", ES_IP)
.put("http.port", esHttpPort)
@@ -100,27 +101,29 @@ public class ElasticsearchIOTest implements Serializable {
// had problems with some jdk, embedded ES was too slow for bulk insertion,
// and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
.put("threadpool.bulk.queue_size", 100);
- node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
+ node = new Node(settingsBuilder.build());
LOG.info("Elasticsearch node created");
node.start();
connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(
new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
+ restClient = connectionConfiguration.createClient();
}
@AfterClass
- public static void afterClass() {
+ public static void afterClass() throws IOException{
+ restClient.close();
node.close();
}
@Before
public void before() throws Exception {
- ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
+ ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient);
}
@Test
public void testSizes() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
PipelineOptions options = PipelineOptionsFactory.create();
ElasticsearchIO.Read read =
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
@@ -134,7 +137,7 @@ public class ElasticsearchIOTest implements Serializable {
@Test
public void testRead() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
PCollection<String> output =
pipeline.apply(
@@ -150,7 +153,7 @@ public class ElasticsearchIOTest implements Serializable {
@Test
public void testReadWithQuery() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
String query =
"{\n"
@@ -185,7 +188,7 @@ public class ElasticsearchIOTest implements Serializable {
pipeline.run();
long currentNumDocs =
- ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
+ ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
assertEquals(NUM_DOCS, currentNumDocs);
QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist");
@@ -258,9 +261,8 @@ public class ElasticsearchIOTest implements Serializable {
if ((numDocsProcessed % 100) == 0) {
// force the index to upgrade after inserting for the inserted docs
// to be searchable immediately
- long currentNumDocs =
- ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
- ES_INDEX, ES_TYPE, node.client());
+ long currentNumDocs = ElasticSearchIOTestUtils
+ .refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
if ((numDocsProcessed % BATCH_SIZE) == 0) {
/* bundle end */
assertEquals(
@@ -304,8 +306,8 @@ public class ElasticsearchIOTest implements Serializable {
// force the index to upgrade after inserting for the inserted docs
// to be searchable immediately
long currentNumDocs =
- ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
- ES_INDEX, ES_TYPE, node.client());
+ ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
+ ES_INDEX, ES_TYPE, restClient);
if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
/* bundle end */
assertThat(
@@ -327,7 +329,7 @@ public class ElasticsearchIOTest implements Serializable {
@Test
public void testSplit() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
PipelineOptions options = PipelineOptionsFactory.create();
ElasticsearchIO.Read read =
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
http://git-wip-us.apache.org/repos/asf/beam/blob/7caea7a8/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index 3a9aae6..2a2dbe9 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -17,13 +17,11 @@
*/
package org.apache.beam.sdk.io.elasticsearch;
-import static java.net.InetAddress.getByName;
import java.io.IOException;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.client.RestClient;
/**
* Manipulates test data used by the {@link ElasticsearchIO}
@@ -51,7 +49,6 @@ public class ElasticsearchTestDataSet {
* -Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \
* -Dexec.args="--elasticsearchServer=1.2.3.4 \
* --elasticsearchHttpPort=9200 \
- * --elasticsearchTcpPort=9300" \
* -Dexec.classpathScope=test
* </pre>
*
@@ -62,29 +59,20 @@ public class ElasticsearchTestDataSet {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
IOTestPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
-
- createAndPopulateIndex(getClient(options), ReadOrWrite.READ);
+ createAndPopulateReadIndex(options);
}
- private static void createAndPopulateIndex(TransportClient client, ReadOrWrite rOw)
- throws Exception {
+ private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception {
+ RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ).createClient();
// automatically creates the index and insert docs
- ElasticSearchIOTestUtils.insertTestDocuments(
- (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, client);
- }
-
- public static TransportClient getClient(IOTestPipelineOptions options) throws Exception {
- TransportClient client =
- TransportClient.builder()
- .build()
- .addTransportAddress(
- new InetSocketTransportAddress(
- getByName(options.getElasticsearchServer()),
- options.getElasticsearchTcpPort()));
- return client;
+ try {
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+ } finally {
+ restClient.close();
+ }
}
- public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
+ static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException {
ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(
@@ -99,8 +87,9 @@ public class ElasticsearchTestDataSet {
return connectionConfiguration;
}
- public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception {
- ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client);
+ static void deleteIndex(RestClient restClient, ReadOrWrite rOw) throws Exception {
+ ElasticSearchIOTestUtils
+ .deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, restClient);
}
/** Enum that tells whether we use the index for reading or for writing. */