You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/30 14:46:02 UTC

git commit: Fixed build issues with PR#45

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7d65fb26d -> d91c4a444


Fixed build issues with PR#45


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d91c4a44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d91c4a44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d91c4a44

Branch: refs/heads/master
Commit: d91c4a444cba9bc805e3b04fd1914e86aa8c194f
Parents: 7d65fb2
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 08:45:55 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 08:45:55 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchPersistUpdater.java            | 35 ------------
 .../ElasticsearchPersistWriterTask.java         | 56 --------------------
 .../ElasticsearchWriterConfiguration.json       |  5 ++
 3 files changed, 5 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index 6982862..b2e7556 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -36,7 +36,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
@@ -44,8 +43,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,44 +100,16 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         this.batchSize = batchSize;
     }
 
-    public void setVeryLargeBulk(boolean veryLargeBulk) {
-        this.veryLargeBulk = veryLargeBulk;
-    }
-
     private final List<String> affectedIndexes = new ArrayList<String>();
 
-    public int getTotalOutstanding() {
-        return this.totalSent - (this.totalFailed + this.totalOk);
-    }
-
     public long getFlushThresholdSizeInBytes() {
         return flushThresholdSizeInBytes;
     }
 
-    public int getTotalSent() {
-        return totalSent;
-    }
-
-    public int getTotalSeconds() {
-        return totalSeconds;
-    }
-
-    public int getTotalOk() {
-        return totalOk;
-    }
-
-    public int getTotalFailed() {
-        return totalFailed;
-    }
-
     public int getTotalBatchCount() {
         return totalBatchCount;
     }
 
-    public long getTotalSizeInBytes() {
-        return totalSizeInBytes;
-    }
-
     public long getBatchSizeInBytes() {
         return batchSizeInBytes;
     }
@@ -149,10 +118,6 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         return batchItemsSent;
     }
 
-    public List<String> getAffectedIndexes() {
-        return this.affectedIndexes;
-    }
-
     public void setFlushThresholdSizeInBytes(long sizeInBytes) {
         this.flushThresholdSizeInBytes = sizeInBytes;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
deleted file mode 100644
index 824f1ac..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class ElasticsearchPersistWriterTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterTask.class);
-
-    private ElasticsearchPersistWriter writer;
-
-    public ElasticsearchPersistWriterTask(ElasticsearchPersistWriter writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index b107be6..a38ff85 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,6 +24,11 @@
             "description": "Item Count before flush",
             "default": 100
         },
+		"batchBytes": {
+			"type": "integer",
+			"description": "Number of bytes before flush",
+			"default": 5242880
+		},
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
 		}