You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/01 22:11:36 UTC
incubator-metron git commit: Weird mac issue..the rename of this file
didn't work because the FS is case insensitive,
so git didn't register that the file was changed.
Repository: incubator-metron
Updated Branches:
refs/heads/master 3be012db9 -> 6638a71ad
Weird mac issue..the rename of this file didn't work because the FS is case insensitive, so git didn't register that the file was changed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/6638a71a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/6638a71a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/6638a71a
Branch: refs/heads/master
Commit: 6638a71ad22a319051083b7344f19ab837346a43
Parents: 3be012d
Author: cstella <ce...@gmail.com>
Authored: Tue Mar 1 16:11:29 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 1 16:11:29 2016 -0500
----------------------------------------------------------------------
.../metron/writer/ElasticSearchWriter.java | 95 --------------------
.../metron/writer/ElasticsearchWriter.java | 95 ++++++++++++++++++++
2 files changed, 95 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6638a71a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticSearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticSearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticSearchWriter.java
deleted file mode 100644
index a0df685..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticSearchWriter.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
-import org.apache.metron.writer.interfaces.BulkMessageWriter;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
-
- private String clusterName;
- private Map<String, String> optionalSettings;
- private transient TransportClient client;
- private String host;
- private int port;
- private SimpleDateFormat dateFormat;
-
- public ElasticsearchWriter(String clusterName, String host, int port, String dateFormat) {
- this.clusterName = clusterName;
- this.host = host;
- this.port = port;
- this.dateFormat = new SimpleDateFormat(dateFormat);
- }
-
- public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
- this.optionalSettings = optionalSettings;
- return this;
- }
-
- @Override
- public void init() {
- ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
- builder.put("cluster.name", clusterName);
- builder.put("client.transport.ping_timeout","500s");
- if (optionalSettings != null) {
- builder.put(optionalSettings);
- }
- client = new TransportClient(builder.build())
- .addTransportAddress(new InetSocketTransportAddress(host, port));
-
- }
-
- @Override
- public void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
- String indexPostfix = dateFormat.format(new Date());
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- for(JSONObject message: messages) {
- String indexName = sourceType;
- if (configuration != null) {
- indexName = configuration.getIndex();
- }
- IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_" + indexPostfix,
- sourceType);
- indexRequestBuilder.setSource(message.toJSONString());
- bulkRequest.add(indexRequestBuilder);
- }
- BulkResponse resp = bulkRequest.execute().actionGet();
- if (resp.hasFailures()) {
- throw new Exception(resp.buildFailureMessage());
- }
- }
-
- @Override
- public void close() throws Exception {
- client.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6638a71a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
new file mode 100644
index 0000000..a0df685
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
+
+ private String clusterName;
+ private Map<String, String> optionalSettings;
+ private transient TransportClient client;
+ private String host;
+ private int port;
+ private SimpleDateFormat dateFormat;
+
+ public ElasticsearchWriter(String clusterName, String host, int port, String dateFormat) {
+ this.clusterName = clusterName;
+ this.host = host;
+ this.port = port;
+ this.dateFormat = new SimpleDateFormat(dateFormat);
+ }
+
+ public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
+ this.optionalSettings = optionalSettings;
+ return this;
+ }
+
+ @Override
+ public void init() {
+ ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+ builder.put("cluster.name", clusterName);
+ builder.put("client.transport.ping_timeout","500s");
+ if (optionalSettings != null) {
+ builder.put(optionalSettings);
+ }
+ client = new TransportClient(builder.build())
+ .addTransportAddress(new InetSocketTransportAddress(host, port));
+
+ }
+
+ @Override
+ public void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+ String indexPostfix = dateFormat.format(new Date());
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ for(JSONObject message: messages) {
+ String indexName = sourceType;
+ if (configuration != null) {
+ indexName = configuration.getIndex();
+ }
+ IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_" + indexPostfix,
+ sourceType);
+ indexRequestBuilder.setSource(message.toJSONString());
+ bulkRequest.add(indexRequestBuilder);
+ }
+ BulkResponse resp = bulkRequest.execute().actionGet();
+ if (resp.hasFailures()) {
+ throw new Exception(resp.buildFailureMessage());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+}