You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/30 14:46:02 UTC
git commit: Fixed build issues with PR#45
Repository: incubator-streams
Updated Branches:
refs/heads/master 7d65fb26d -> d91c4a444
Fixed build issues with PR#45
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d91c4a44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d91c4a44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d91c4a44
Branch: refs/heads/master
Commit: d91c4a444cba9bc805e3b04fd1914e86aa8c194f
Parents: 7d65fb2
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 08:45:55 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 08:45:55 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchPersistUpdater.java | 35 ------------
.../ElasticsearchPersistWriterTask.java | 56 --------------------
.../ElasticsearchWriterConfiguration.json | 5 ++
3 files changed, 5 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index 6982862..b2e7556 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -36,7 +36,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
@@ -44,8 +43,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,44 +100,16 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
this.batchSize = batchSize;
}
- public void setVeryLargeBulk(boolean veryLargeBulk) {
- this.veryLargeBulk = veryLargeBulk;
- }
-
private final List<String> affectedIndexes = new ArrayList<String>();
- public int getTotalOutstanding() {
- return this.totalSent - (this.totalFailed + this.totalOk);
- }
-
public long getFlushThresholdSizeInBytes() {
return flushThresholdSizeInBytes;
}
- public int getTotalSent() {
- return totalSent;
- }
-
- public int getTotalSeconds() {
- return totalSeconds;
- }
-
- public int getTotalOk() {
- return totalOk;
- }
-
- public int getTotalFailed() {
- return totalFailed;
- }
-
public int getTotalBatchCount() {
return totalBatchCount;
}
- public long getTotalSizeInBytes() {
- return totalSizeInBytes;
- }
-
public long getBatchSizeInBytes() {
return batchSizeInBytes;
}
@@ -149,10 +118,6 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
return batchItemsSent;
}
- public List<String> getAffectedIndexes() {
- return this.affectedIndexes;
- }
-
public void setFlushThresholdSizeInBytes(long sizeInBytes) {
this.flushThresholdSizeInBytes = sizeInBytes;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
deleted file mode 100644
index 824f1ac..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class ElasticsearchPersistWriterTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterTask.class);
-
- private ElasticsearchPersistWriter writer;
-
- public ElasticsearchPersistWriterTask(ElasticsearchPersistWriter writer) {
- this.writer = writer;
- }
-
- @Override
- public void run() {
-
- while(true) {
- if( writer.persistQueue.peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(1));
- } catch (InterruptedException e) {}
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index b107be6..a38ff85 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,6 +24,11 @@
"description": "Item Count before flush",
"default": 100
},
+ "batchBytes": {
+ "type": "integer",
+ "description": "Number of bytes before flush",
+ "default": 5242880
+ },
"maxTimeBetweenFlushMs": {
"type": "integer"
}