You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streams.apache.org by smashew <gi...@git.apache.org> on 2014/06/26 22:23:11 UTC

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

GitHub user smashew opened a pull request:

    https://github.com/apache/incubator-streams/pull/45

    Fixed the ElasticSearchPersistWriter

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/smashew/incubator-streams Streams-ES-Writer-Fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-streams/pull/45.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #45
    
----
commit 1d36ab61e1046ec8c197a9be23b3234fed4cf56c
Author: Matthew Hager <ma...@gmail.com>
Date:   2014-06-26T20:16:24Z

    Fixed the ElasticSearchPersistWriter

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296480
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -575,102 +452,90 @@ private void checkThenAddBatch(String index, String type, Map<String, String> wo
     
             return toReturn;
         }
    +    */
     
    -    @Override
         public void prepare(Object configurationObject) {
    -        mapper = StreamsJacksonMapper.getInstance();
    -        veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
    -        batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
    -        maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
    -        start();
    -    }
    +        this.veryLargeBulk = config.getBulk() == null ?
    +                Boolean.FALSE :
    +                config.getBulk();
     
    -    /**
    -     * This method is to ONLY be called by flushInternal otherwise the counts will be off.
    -     * @param bulkRequest
    -     * @param thisSent
    -     * @param thisSizeInBytes
    -     */
    -    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
    -        LOGGER.debug("Attempting to write {} items to ES", thisSent);
    -        final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
    -        this.addResponseFuture(responseFuture);
    -        responseFuture.addListener(new ActionListener<BulkResponse>() {
    -            @Override
    -            public void onResponse(BulkResponse bulkItemResponses) {
    -                lastWrite.set(System.currentTimeMillis());
    -                removeResponseFuture(responseFuture);
    -
    -                updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
    -            }
    +        this.flushThresholdsRecords = config.getBatchSize() == null ?
    +                DEFAULT_BATCH_SIZE :
    +                (int)(config.getBatchSize().longValue());
     
    -            @Override
    -            public void onFailure(Throwable e) {
    -                LOGGER.error("Error bulk loading: {}", e.getMessage());
    -                removeResponseFuture(responseFuture);
    -                e.printStackTrace();
    -            }
    -        });
    -    }
    +        this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ?
    +                config.getMaxTimeBetweenFlushMs() :
    +                DEFAULT_MAX_WAIT;
     
    -    private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) {
    -        if (bulkItemResponses.hasFailures())
    -            LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
    +        this.flushThresholdBytes = config.getBatchBytes() == null ?
    +                DEFAULT_BULK_FLUSH_THRESHOLD :
    +                config.getBatchBytes();
     
    -        long thisFailed = 0;
    -        long thisOk = 0;
    -        long thisMillis = bulkItemResponses.getTookInMillis();
    +        timer.scheduleAtFixedRate(new TimerTask() {
    +            public void run() {
    +                checkForFlush();
    +            }
    +        }, this.flushThresholdTime, this.flushThresholdTime);
     
    -        // keep track of the number of totalFailed and items that we have totalOk.
    -        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
    -            if (resp.isFailed())
    -                thisFailed++;
    -            else
    -                thisOk++;
    -        }
    +    }
     
    -        synchronized(countLock) {
    -            totalAttempted += thisSent;
    -            totalOk += thisOk;
    -            totalFailed += thisFailed;
    -            totalSeconds += (thisMillis / 1000);
    -            lock.writeLock().unlock();
    -        }
    +    private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
    +        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)));
     
    -        if (thisSent != (thisOk + thisFailed))
    -            LOGGER.error("We sent more items than this");
     
    -        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
    -                MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
    -                MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
    +        // record the last time we flushed the index
    +        this.lastFlush = new Date().getTime();
     
    -    }
    +        // add the totals
    +        this.totalSent.addAndGet(sent);
     
    +        // add the total number of batches sent
    +        this.batchesSent.incrementAndGet();
     
    -    private void checkAndCreateBulkRequest() {
    -        // Synchronize to ensure that we don't lose any records
    -        lock.writeLock().lock();
             try {
    -            if (bulkRequest == null)
    -                bulkRequest = this.manager.getClient().prepareBulk();
    -        } finally {
    -            lock.writeLock().unlock();
    -        }
    -    }
    +            bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
    +                public void onResponse(BulkResponse bulkItemResponses) {
    +                    batchesResponded.incrementAndGet();
    +                    updateTotals(bulkItemResponses, sent, sizeInBytes);
    +                }
     
    -    //Locking on a separate object than the writer as these objects are intended to be handled separately
    -    private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
    -        synchronized (requestLock) {
    -            this.responses.add(future);
    +                public void onFailure(Throwable throwable) {
    +                    batchesResponded.incrementAndGet();
    +                    throwable.printStackTrace();
    --- End diff --
    
    I can log it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14290587
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    -                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
    +        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
    +        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
    +            return docAsJson;
    +        else {
    +            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
    +            try {
    +                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
                 }
    -            if (streamsDatum.getDocument() instanceof String)
    -                json = streamsDatum.getDocument().toString();
    -            else {
    -                json = mapper.writeValueAsString(streamsDatum.getDocument());
    +            catch(Throwable e) {
    +                LOGGER.warn("Unable to write metadata");
                 }
    -
    -            add(config.getIndex(), config.getType(), id, ts, json);
    -
    -        } catch (Exception e) {
    -            LOGGER.warn("{} {}", e.getMessage());
    -            e.printStackTrace();
    +            return OBJECT_MAPPER.writeValueAsString(node);
             }
         }
     
         public void cleanUp() {
    -
             try {
    -            flush();
    -            backgroundFlushTask.shutdownNow();
    -        } catch (IOException e) {
    -            e.printStackTrace();
    -        }
    -        close();
    -    }
     
    -    @Override
    -    public void close() {
    -        try {
                 // before they close, check to ensure that
    -            this.flush();
    -
    -            this.lock.writeLock().lock();
    -
    -            int count = 0;
    -            // We are going to give it 5 minutes.
    -            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5) {
    -                for(ListenableActionFuture<BulkResponse> future : responses) {
    -                    if(future.isDone() || future.isCancelled()) {
    -                        BulkResponse response = future.get();
    -                        LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
    -                        updateTotals(response, 0, 0);
    -                    }
    -                }
    -                Thread.sleep(50);
    -            }
    -
    -            if (this.getTotalOutstanding() > 0) {
    -                LOGGER.error("We never cleared our buffer");
    -            }
    +            flushInternal();
     
    +            waitToCatchUp(0, 5 * 60 * 1000);
    +            refreshIndexes();
     
    -            for (String indexName : this.getAffectedIndexes()) {
    -                createIndexIfMissing(indexName);
    +            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
     
    -                if (this.veryLargeBulk) {
    -                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    -                    // They are in 'very large bulk' mode and the process is finished. We now want to turn the
    -                    // refreshing back on.
    -                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
    +        } catch (Throwable e) {
    +            // this line of code should be logically unreachable.
    +            LOGGER.warn("This is unexpected: {}", e.getMessage());
    +            e.printStackTrace();
    +        }
    +    }
     
    -                    // submit to ElasticSearch
    -                    this.manager.getClient()
    -                            .admin()
    -                            .indices()
    -                            .updateSettings(updateSettingsRequest)
    -                            .actionGet();
    -                }
    +    private void refreshIndexes() {
    +        for (String indexName : this.affectedIndexes) {
     
    -                checkIndexImplications(indexName);
    +            if (this.veryLargeBulk) {
    +                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    +                // They are in 'very large bulk' mode and the process is finished. We now want to turn the
    +                // refreshing back on.
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
     
    -                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +                // submit to ElasticSearch
                     this.manager.getClient()
                             .admin()
                             .indices()
    -                        .prepareRefresh(indexName)
    -                        .execute()
    +                        .updateSettings(updateSettingsRequest)
                             .actionGet();
                 }
     
    -            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
    +            checkIndexImplications(indexName);
     
    -        } catch (Exception e) {
    -            // this line of code should be logically unreachable.
    -            LOGGER.warn("This is unexpected: {}", e.getMessage());
    -            e.printStackTrace();
    -        } finally {
    -            this.lock.writeLock().unlock();
    +            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +            this.manager.getClient()
    +                    .admin()
    +                    .indices()
    +                    .prepareRefresh(indexName)
    +                    .execute()
    +                    .actionGet();
             }
         }
     
         @Override
    -    public void flush() throws IOException {
    -        flushInternal();
    -    }
    -
    -    @Override
         public DatumStatusCounter getDatumStatusCounter() {
             DatumStatusCounter counters = new DatumStatusCounter();
    -        counters.incrementAttempt(this.batchItemsSent);
    -        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
    -        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
    +        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
    +        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
             return counters;
         }
     
    -    public void start() {
    -        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
    -            @Override
    -            public void run() {
    -                LOGGER.debug("Checking to see if data needs to be flushed");
    -                long time = System.currentTimeMillis() - lastWrite.get();
    -                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
    -                    LOGGER.debug("Background Flush task determined {} are waiting to be flushed.  It has been {} since the last write to ES", batchItemsSent, time);
    -                    flushInternal();
    -                }
    -            }
    -        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
    -        manager = new ElasticsearchClientManager(config);
    -        client = manager.getClient();
    -
    -        LOGGER.info(client.toString());
    -    }
    -
    -    public void flushInternal() {
    -        lock.writeLock().lock();
    +    private synchronized void flushInternal() {
             // we do not have a working bulk request, we can just exit here.
    -        if (this.bulkRequest == null || batchItemsSent == 0)
    +        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
                 return;
     
    +        // wait for one minute to catch up if it needs to
    +        waitToCatchUp(5, 1 * 60 * 1000);
    +
             // call the flush command.
    -        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
    +        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
     
    -        // null the flush request, this will be created in the 'add' function below
    -        this.bulkRequest = null;
    +        // reset the current batch statistics
    +        this.currentBatchItems.set(0);
    +        this.currentBatchBytes.set(0);
     
    -        // record the proper statistics, and add it to our totals.
    -        this.totalSizeInBytes += this.batchSizeInBytes;
    -        this.totalSent += batchItemsSent;
    +        // reset our bulk request builder
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
    +    }
     
    -        // reset the current batch statistics
    -        this.batchSizeInBytes = 0;
    -        this.batchItemsSent = 0;
    +    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
    +        int counter = 0;
    +        // If we still have 5 batches outstanding, we need to give it a minute to catch up
    +        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
    +            try {
    +                Thread.yield();
    +                Thread.sleep(1);
    +                timeOutThresholdInMS++;
    --- End diff --
    
    Why only sleep 1ms?  you are going to spend a lot of CPU time in your wait 
    
    Also, why are you incrementing the timeoutthreshold?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296315
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    -                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
    +        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
    +        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
    +            return docAsJson;
    +        else {
    +            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
    +            try {
    +                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
                 }
    -            if (streamsDatum.getDocument() instanceof String)
    -                json = streamsDatum.getDocument().toString();
    -            else {
    -                json = mapper.writeValueAsString(streamsDatum.getDocument());
    +            catch(Throwable e) {
    +                LOGGER.warn("Unable to write metadata");
                 }
    -
    -            add(config.getIndex(), config.getType(), id, ts, json);
    -
    -        } catch (Exception e) {
    -            LOGGER.warn("{} {}", e.getMessage());
    -            e.printStackTrace();
    +            return OBJECT_MAPPER.writeValueAsString(node);
             }
         }
     
         public void cleanUp() {
    -
             try {
    -            flush();
    -            backgroundFlushTask.shutdownNow();
    -        } catch (IOException e) {
    -            e.printStackTrace();
    -        }
    -        close();
    -    }
     
    -    @Override
    -    public void close() {
    -        try {
                 // before they close, check to ensure that
    -            this.flush();
    -
    -            this.lock.writeLock().lock();
    -
    -            int count = 0;
    -            // We are going to give it 5 minutes.
    -            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5) {
    -                for(ListenableActionFuture<BulkResponse> future : responses) {
    -                    if(future.isDone() || future.isCancelled()) {
    -                        BulkResponse response = future.get();
    -                        LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
    -                        updateTotals(response, 0, 0);
    -                    }
    -                }
    -                Thread.sleep(50);
    -            }
    -
    -            if (this.getTotalOutstanding() > 0) {
    -                LOGGER.error("We never cleared our buffer");
    -            }
    +            flushInternal();
     
    +            waitToCatchUp(0, 5 * 60 * 1000);
    +            refreshIndexes();
     
    -            for (String indexName : this.getAffectedIndexes()) {
    -                createIndexIfMissing(indexName);
    +            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
     
    -                if (this.veryLargeBulk) {
    -                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    -                    // They are in 'very large bulk' mode and the process is finished. We now want to turn the
    -                    // refreshing back on.
    -                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
    +        } catch (Throwable e) {
    +            // this line of code should be logically unreachable.
    +            LOGGER.warn("This is unexpected: {}", e.getMessage());
    +            e.printStackTrace();
    +        }
    +    }
     
    -                    // submit to ElasticSearch
    -                    this.manager.getClient()
    -                            .admin()
    -                            .indices()
    -                            .updateSettings(updateSettingsRequest)
    -                            .actionGet();
    -                }
    +    private void refreshIndexes() {
    +        for (String indexName : this.affectedIndexes) {
     
    -                checkIndexImplications(indexName);
    +            if (this.veryLargeBulk) {
    +                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    +                // They are in 'very large bulk' mode and the process is finished. We now want to turn the
    +                // refreshing back on.
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
     
    -                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +                // submit to ElasticSearch
                     this.manager.getClient()
                             .admin()
                             .indices()
    -                        .prepareRefresh(indexName)
    -                        .execute()
    +                        .updateSettings(updateSettingsRequest)
                             .actionGet();
                 }
     
    -            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
    +            checkIndexImplications(indexName);
     
    -        } catch (Exception e) {
    -            // this line of code should be logically unreachable.
    -            LOGGER.warn("This is unexpected: {}", e.getMessage());
    -            e.printStackTrace();
    -        } finally {
    -            this.lock.writeLock().unlock();
    +            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +            this.manager.getClient()
    +                    .admin()
    +                    .indices()
    +                    .prepareRefresh(indexName)
    +                    .execute()
    +                    .actionGet();
             }
         }
     
         @Override
    -    public void flush() throws IOException {
    -        flushInternal();
    -    }
    -
    -    @Override
         public DatumStatusCounter getDatumStatusCounter() {
             DatumStatusCounter counters = new DatumStatusCounter();
    -        counters.incrementAttempt(this.batchItemsSent);
    -        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
    -        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
    +        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
    +        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
             return counters;
         }
     
    -    public void start() {
    -        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
    -            @Override
    -            public void run() {
    -                LOGGER.debug("Checking to see if data needs to be flushed");
    -                long time = System.currentTimeMillis() - lastWrite.get();
    -                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
    -                    LOGGER.debug("Background Flush task determined {} are waiting to be flushed.  It has been {} since the last write to ES", batchItemsSent, time);
    -                    flushInternal();
    -                }
    -            }
    -        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
    -        manager = new ElasticsearchClientManager(config);
    -        client = manager.getClient();
    -
    -        LOGGER.info(client.toString());
    -    }
    -
    -    public void flushInternal() {
    -        lock.writeLock().lock();
    +    private synchronized void flushInternal() {
             // we do not have a working bulk request, we can just exit here.
    -        if (this.bulkRequest == null || batchItemsSent == 0)
    +        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
                 return;
     
    +        // wait for one minute to catch up if it needs to
    +        waitToCatchUp(5, 1 * 60 * 1000);
    +
             // call the flush command.
    -        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
    +        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
     
    -        // null the flush request, this will be created in the 'add' function below
    -        this.bulkRequest = null;
    +        // reset the current batch statistics
    +        this.currentBatchItems.set(0);
    +        this.currentBatchBytes.set(0);
     
    -        // record the proper statistics, and add it to our totals.
    -        this.totalSizeInBytes += this.batchSizeInBytes;
    -        this.totalSent += batchItemsSent;
    +        // reset our bulk request builder
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
    +    }
     
    -        // reset the current batch statistics
    -        this.batchSizeInBytes = 0;
    -        this.batchItemsSent = 0;
    +    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
    +        int counter = 0;
    +        // If we still have 5 batches outstanding, we need to give it a minute to catch up
    +        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
    +            try {
    +                Thread.yield();
    +                Thread.sleep(1);
    +                timeOutThresholdInMS++;
    --- End diff --
    
    The calculations happen in thousands of fractions of a ms. sleeping for 1 ms allows it to start writing as quickly as possible again instead of having a higher wait time and missing its ability to work. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296347
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    --- End diff --
    
    I re-wrote the entire thing to make it work properly any 'dropping' was unintentional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14300496
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
    --- End diff --
    
    Ok, so the behavior is intended.  There is no need to ensure that this method is the ONLY thing executed by the class at one time.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14290782
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
     
    -        // check to see if we have seen this index before.
    -        if(this.affectedIndexes.contains(indexName))
    -            return;
    +            // this will be common if we have already verified the index.
    +            if (this.affectedIndexes.contains(indexName))
    +                return;
     
    -        // we haven't log this index.
    -        this.affectedIndexes.add(indexName);
     
    -        // Check to see if we are in 'veryLargeBulk' mode
    -        // if we aren't, exit early
    -        if(!this.veryLargeBulk)
    -            return;
    +            // create the index if it is missing
    +            createIndexIfMissing(indexName);
     
    +            // we haven't log this index.
    +            this.affectedIndexes.add(indexName);
     
    -        // They are in 'very large bulk' mode we want to turn off refreshing the index.
    +            // Check to see if we are in 'veryLargeBulk' mode
    +            // if we aren't, exit early
    +            if (this.veryLargeBulk) {
     
    -        // Create a request then add the setting to tell it to stop refreshing the interval
    -        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
    +                // They are in 'very large bulk' mode we want to turn off refreshing the index.
    +                // Create a request then add the setting to tell it to stop refreshing the interval
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
     
    -        // submit to ElasticSearch
    -        this.manager.getClient()
    -                .admin()
    -                .indices()
    -                .updateSettings(updateSettingsRequest)
    -                .actionGet();
    +                // submit to ElasticSearch
    +                this.manager.getClient()
    +                        .admin()
    +                        .indices()
    +                        .updateSettings(updateSettingsRequest)
    +                        .actionGet();
    +            }
    +        }
         }
     
         public void createIndexIfMissing(String indexName) {
    +        // Synchronize this on a static class level
    --- End diff --
    
    I don't see any synchronization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296430
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
     
    -        // check to see if we have seen this index before.
    -        if(this.affectedIndexes.contains(indexName))
    -            return;
    +            // this will be common if we have already verified the index.
    +            if (this.affectedIndexes.contains(indexName))
    +                return;
     
    -        // we haven't log this index.
    -        this.affectedIndexes.add(indexName);
     
    -        // Check to see if we are in 'veryLargeBulk' mode
    -        // if we aren't, exit early
    -        if(!this.veryLargeBulk)
    -            return;
    +            // create the index if it is missing
    +            createIndexIfMissing(indexName);
     
    +            // we haven't log this index.
    +            this.affectedIndexes.add(indexName);
     
    -        // They are in 'very large bulk' mode we want to turn off refreshing the index.
    +            // Check to see if we are in 'veryLargeBulk' mode
    +            // if we aren't, exit early
    +            if (this.veryLargeBulk) {
     
    -        // Create a request then add the setting to tell it to stop refreshing the interval
    -        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
    +                // They are in 'very large bulk' mode we want to turn off refreshing the index.
    +                // Create a request then add the setting to tell it to stop refreshing the interval
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
     
    -        // submit to ElasticSearch
    -        this.manager.getClient()
    -                .admin()
    -                .indices()
    -                .updateSettings(updateSettingsRequest)
    -                .actionGet();
    +                // submit to ElasticSearch
    +                this.manager.getClient()
    +                        .admin()
    +                        .indices()
    +                        .updateSettings(updateSettingsRequest)
    +                        .actionGet();
    +            }
    +        }
         }
     
         public void createIndexIfMissing(String indexName) {
    +        // Synchronize this on a static class level
    --- End diff --
    
    it is synchronized by the method that calls this so doesn't need to be synchronized here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14312013
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    --- End diff --
    
    making code smaller since 1998


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14291458
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -575,102 +452,90 @@ private void checkThenAddBatch(String index, String type, Map<String, String> wo
     
             return toReturn;
         }
    +    */
     
    -    @Override
         public void prepare(Object configurationObject) {
    -        mapper = StreamsJacksonMapper.getInstance();
    -        veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
    -        batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
    -        maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
    -        start();
    -    }
    +        this.veryLargeBulk = config.getBulk() == null ?
    +                Boolean.FALSE :
    +                config.getBulk();
     
    -    /**
    -     * This method is to ONLY be called by flushInternal otherwise the counts will be off.
    -     * @param bulkRequest
    -     * @param thisSent
    -     * @param thisSizeInBytes
    -     */
    -    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
    -        LOGGER.debug("Attempting to write {} items to ES", thisSent);
    -        final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
    -        this.addResponseFuture(responseFuture);
    -        responseFuture.addListener(new ActionListener<BulkResponse>() {
    -            @Override
    -            public void onResponse(BulkResponse bulkItemResponses) {
    -                lastWrite.set(System.currentTimeMillis());
    -                removeResponseFuture(responseFuture);
    -
    -                updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
    -            }
    +        this.flushThresholdsRecords = config.getBatchSize() == null ?
    +                DEFAULT_BATCH_SIZE :
    +                (int)(config.getBatchSize().longValue());
     
    -            @Override
    -            public void onFailure(Throwable e) {
    -                LOGGER.error("Error bulk loading: {}", e.getMessage());
    -                removeResponseFuture(responseFuture);
    -                e.printStackTrace();
    -            }
    -        });
    -    }
    +        this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ?
    +                config.getMaxTimeBetweenFlushMs() :
    +                DEFAULT_MAX_WAIT;
     
    -    private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) {
    -        if (bulkItemResponses.hasFailures())
    -            LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
    +        this.flushThresholdBytes = config.getBatchBytes() == null ?
    +                DEFAULT_BULK_FLUSH_THRESHOLD :
    +                config.getBatchBytes();
     
    -        long thisFailed = 0;
    -        long thisOk = 0;
    -        long thisMillis = bulkItemResponses.getTookInMillis();
    +        timer.scheduleAtFixedRate(new TimerTask() {
    +            public void run() {
    +                checkForFlush();
    +            }
    +        }, this.flushThresholdTime, this.flushThresholdTime);
     
    -        // keep track of the number of totalFailed and items that we have totalOk.
    -        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
    -            if (resp.isFailed())
    -                thisFailed++;
    -            else
    -                thisOk++;
    -        }
    +    }
     
    -        synchronized(countLock) {
    -            totalAttempted += thisSent;
    -            totalOk += thisOk;
    -            totalFailed += thisFailed;
    -            totalSeconds += (thisMillis / 1000);
    -            lock.writeLock().unlock();
    -        }
    +    private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
    +        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)));
     
    -        if (thisSent != (thisOk + thisFailed))
    -            LOGGER.error("We sent more items than this");
     
    -        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
    -                MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
    -                MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
    +        // record the last time we flushed the index
    +        this.lastFlush = new Date().getTime();
     
    -    }
    +        // add the totals
    +        this.totalSent.addAndGet(sent);
     
    +        // add the total number of batches sent
    +        this.batchesSent.incrementAndGet();
     
    -    private void checkAndCreateBulkRequest() {
    -        // Synchronize to ensure that we don't lose any records
    -        lock.writeLock().lock();
             try {
    -            if (bulkRequest == null)
    -                bulkRequest = this.manager.getClient().prepareBulk();
    -        } finally {
    -            lock.writeLock().unlock();
    -        }
    -    }
    +            bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
    +                public void onResponse(BulkResponse bulkItemResponses) {
    +                    batchesResponded.incrementAndGet();
    +                    updateTotals(bulkItemResponses, sent, sizeInBytes);
    +                }
     
    -    //Locking on a separate object than the writer as these objects are intended to be handled separately
    -    private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
    -        synchronized (requestLock) {
    -            this.responses.add(future);
    +                public void onFailure(Throwable throwable) {
    +                    batchesResponded.incrementAndGet();
    +                    throwable.printStackTrace();
    --- End diff --
    
    log instead or print


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14290293
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    --- End diff --
    
    Any particular reason to drop the timestamp?  I believe there are people relying on this functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on the pull request:

    https://github.com/apache/incubator-streams/pull/45#issuecomment-47373849
  
    thx, when this is merged, I have about 20 tests that are written to support this works properly under a wide variety of conditions. I will submit the new version of the local stream builder and the tests that correspond with this as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14300444
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
     
    -        // check to see if we have seen this index before.
    -        if(this.affectedIndexes.contains(indexName))
    -            return;
    +            // this will be common if we have already verified the index.
    +            if (this.affectedIndexes.contains(indexName))
    +                return;
     
    -        // we haven't log this index.
    -        this.affectedIndexes.add(indexName);
     
    -        // Check to see if we are in 'veryLargeBulk' mode
    -        // if we aren't, exit early
    -        if(!this.veryLargeBulk)
    -            return;
    +            // create the index if it is missing
    +            createIndexIfMissing(indexName);
     
    +            // we haven't log this index.
    +            this.affectedIndexes.add(indexName);
     
    -        // They are in 'very large bulk' mode we want to turn off refreshing the index.
    +            // Check to see if we are in 'veryLargeBulk' mode
    +            // if we aren't, exit early
    +            if (this.veryLargeBulk) {
     
    -        // Create a request then add the setting to tell it to stop refreshing the interval
    -        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
    +                // They are in 'very large bulk' mode we want to turn off refreshing the index.
    +                // Create a request then add the setting to tell it to stop refreshing the interval
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
     
    -        // submit to ElasticSearch
    -        this.manager.getClient()
    -                .admin()
    -                .indices()
    -                .updateSettings(updateSettingsRequest)
    -                .actionGet();
    +                // submit to ElasticSearch
    +                this.manager.getClient()
    +                        .admin()
    +                        .indices()
    +                        .updateSettings(updateSettingsRequest)
    +                        .actionGet();
    +            }
    +        }
         }
     
         public void createIndexIfMissing(String indexName) {
    +        // Synchronize this on a static class level
    --- End diff --
    
    this is a public method.  What prevents others from calling it?  Also, I would remove the comment as it is misleading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on the pull request:

    https://github.com/apache/incubator-streams/pull/45#issuecomment-47370286
  
    :+1: I will commit this today


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14300390
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    -                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
    +        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
    +        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
    +            return docAsJson;
    +        else {
    +            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
    +            try {
    +                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
                 }
    -            if (streamsDatum.getDocument() instanceof String)
    -                json = streamsDatum.getDocument().toString();
    -            else {
    -                json = mapper.writeValueAsString(streamsDatum.getDocument());
    +            catch(Throwable e) {
    +                LOGGER.warn("Unable to write metadata");
                 }
    -
    -            add(config.getIndex(), config.getType(), id, ts, json);
    -
    -        } catch (Exception e) {
    -            LOGGER.warn("{} {}", e.getMessage());
    -            e.printStackTrace();
    +            return OBJECT_MAPPER.writeValueAsString(node);
             }
         }
     
         public void cleanUp() {
    -
             try {
    -            flush();
    -            backgroundFlushTask.shutdownNow();
    -        } catch (IOException e) {
    -            e.printStackTrace();
    -        }
    -        close();
    -    }
     
    -    @Override
    -    public void close() {
    -        try {
                 // before they close, check to ensure that
    -            this.flush();
    -
    -            this.lock.writeLock().lock();
    -
    -            int count = 0;
    -            // We are going to give it 5 minutes.
    -            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5) {
    -                for(ListenableActionFuture<BulkResponse> future : responses) {
    -                    if(future.isDone() || future.isCancelled()) {
    -                        BulkResponse response = future.get();
    -                        LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
    -                        updateTotals(response, 0, 0);
    -                    }
    -                }
    -                Thread.sleep(50);
    -            }
    -
    -            if (this.getTotalOutstanding() > 0) {
    -                LOGGER.error("We never cleared our buffer");
    -            }
    +            flushInternal();
     
    +            waitToCatchUp(0, 5 * 60 * 1000);
    +            refreshIndexes();
     
    -            for (String indexName : this.getAffectedIndexes()) {
    -                createIndexIfMissing(indexName);
    +            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
     
    -                if (this.veryLargeBulk) {
    -                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    -                    // They are in 'very large bulk' mode and the process is finished. We now want to turn the
    -                    // refreshing back on.
    -                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
    +        } catch (Throwable e) {
    +            // this line of code should be logically unreachable.
    +            LOGGER.warn("This is unexpected: {}", e.getMessage());
    +            e.printStackTrace();
    +        }
    +    }
     
    -                    // submit to ElasticSearch
    -                    this.manager.getClient()
    -                            .admin()
    -                            .indices()
    -                            .updateSettings(updateSettingsRequest)
    -                            .actionGet();
    -                }
    +    private void refreshIndexes() {
    +        for (String indexName : this.affectedIndexes) {
     
    -                checkIndexImplications(indexName);
    +            if (this.veryLargeBulk) {
    +                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    +                // They are in 'very large bulk' mode and the process is finished. We now want to turn the
    +                // refreshing back on.
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
     
    -                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +                // submit to ElasticSearch
                     this.manager.getClient()
                             .admin()
                             .indices()
    -                        .prepareRefresh(indexName)
    -                        .execute()
    +                        .updateSettings(updateSettingsRequest)
                             .actionGet();
                 }
     
    -            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
    +            checkIndexImplications(indexName);
     
    -        } catch (Exception e) {
    -            // this line of code should be logically unreachable.
    -            LOGGER.warn("This is unexpected: {}", e.getMessage());
    -            e.printStackTrace();
    -        } finally {
    -            this.lock.writeLock().unlock();
    +            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +            this.manager.getClient()
    +                    .admin()
    +                    .indices()
    +                    .prepareRefresh(indexName)
    +                    .execute()
    +                    .actionGet();
             }
         }
     
         @Override
    -    public void flush() throws IOException {
    -        flushInternal();
    -    }
    -
    -    @Override
         public DatumStatusCounter getDatumStatusCounter() {
             DatumStatusCounter counters = new DatumStatusCounter();
    -        counters.incrementAttempt(this.batchItemsSent);
    -        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
    -        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
    +        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
    +        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
             return counters;
         }
     
    -    public void start() {
    -        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
    -            @Override
    -            public void run() {
    -                LOGGER.debug("Checking to see if data needs to be flushed");
    -                long time = System.currentTimeMillis() - lastWrite.get();
    -                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
    -                    LOGGER.debug("Background Flush task determined {} are waiting to be flushed.  It has been {} since the last write to ES", batchItemsSent, time);
    -                    flushInternal();
    -                }
    -            }
    -        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
    -        manager = new ElasticsearchClientManager(config);
    -        client = manager.getClient();
    -
    -        LOGGER.info(client.toString());
    -    }
    -
    -    public void flushInternal() {
    -        lock.writeLock().lock();
    +    private synchronized void flushInternal() {
             // we do not have a working bulk request, we can just exit here.
    -        if (this.bulkRequest == null || batchItemsSent == 0)
    +        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
                 return;
     
    +        // wait for one minute to catch up if it needs to
    +        waitToCatchUp(5, 1 * 60 * 1000);
    +
             // call the flush command.
    -        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
    +        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
     
    -        // null the flush request, this will be created in the 'add' function below
    -        this.bulkRequest = null;
    +        // reset the current batch statistics
    +        this.currentBatchItems.set(0);
    +        this.currentBatchBytes.set(0);
     
    -        // record the proper statistics, and add it to our totals.
    -        this.totalSizeInBytes += this.batchSizeInBytes;
    -        this.totalSent += batchItemsSent;
    +        // reset our bulk request builder
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
    +    }
     
    -        // reset the current batch statistics
    -        this.batchSizeInBytes = 0;
    -        this.batchItemsSent = 0;
    +    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
    +        int counter = 0;
    +        // If we still have 5 batches outstanding, we need to give it a minute to catch up
    +        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
    +            try {
    +                Thread.yield();
    +                Thread.sleep(1);
    +                timeOutThresholdInMS++;
    --- End diff --
    
    If the throughput warrants 1ms, that is fine.
    
    However, Is there are reason why you are incrementing the timeout threshold instead of the counter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296922
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    --- End diff --
    
    it is still there, just more sussinct.
                add(config.getIndex(), config.getType(), streamsDatum.getId(),
                        streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
                        convertAndAppendMetadata(streamsDatum));



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296368
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    -                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
    +        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
    +        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
    +            return docAsJson;
    +        else {
    +            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
    +            try {
    +                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
                 }
    -            if (streamsDatum.getDocument() instanceof String)
    -                json = streamsDatum.getDocument().toString();
    -            else {
    -                json = mapper.writeValueAsString(streamsDatum.getDocument());
    +            catch(Throwable e) {
    --- End diff --
    
    it could be bubbled up....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14291368
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
    --- End diff --
    
    What is the intent of a different lock here?  
    
    As written, threads can enter other blocks you are synchronizing on, like add, one at a time BUT can also enter this block one at a time concurrent to accessing the other methods.  Is that the intent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14300164
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    --- End diff --
    
    I see.  I missed that in the line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296411
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
    --- End diff --
    
    too not run too many check index requests simultaneously.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by sieverssj <gi...@git.apache.org>.
Github user sieverssj commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14291158
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -533,22 +424,8 @@ public void createIndexIfMissing(String indexName) {
             }
         }
     
    -    public void add(String indexName, String type, Map<String, Object> toImport) {
    -        for (String id : toImport.keySet())
    -            add(indexName, type, id, (String) toImport.get(id));
    -    }
    -
    -    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch) {
    -        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
    -
    -        for (String toAddId : workingBatch.keySet())
    -            if (!invalidIDs.contains(toAddId))
    -                add(index, type, toAddId, workingBatch.get(toAddId));
    -
    -        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
    -    }
    -
    -
    +    /**
    --- End diff --
    
    If this is no longer used it should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-streams/pull/45


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by sieverssj <gi...@git.apache.org>.
Github user sieverssj commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14291337
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get(); }
    +    public long getBatchesResponded()                       { return batchesResponded.get(); }
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush; }
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get(); }
    +    public long getTotalOk()                                { return this.totalOk.get(); }
    +    public long getTotalFailed()                            { return this.totalFailed.get(); }
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
    +    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient() != null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    -                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
    +        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
    +        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
    +            return docAsJson;
    +        else {
    +            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
    +            try {
    +                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
                 }
    -            if (streamsDatum.getDocument() instanceof String)
    -                json = streamsDatum.getDocument().toString();
    -            else {
    -                json = mapper.writeValueAsString(streamsDatum.getDocument());
    +            catch(Throwable e) {
    --- End diff --
    
    The only caller of this method also catches throwable.  It wouldn't hurt to let the exception bubble up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...

Posted by smashew <gi...@git.apache.org>.
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296459
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---
    @@ -533,22 +424,8 @@ public void createIndexIfMissing(String indexName) {
             }
         }
     
    -    public void add(String indexName, String type, Map<String, Object> toImport) {
    -        for (String id : toImport.keySet())
    -            add(indexName, type, id, (String) toImport.get(id));
    -    }
    -
    -    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch) {
    -        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
    -
    -        for (String toAddId : workingBatch.keySet())
    -            if (!invalidIDs.contains(toAddId))
    -                add(index, type, toAddId, workingBatch.get(toAddId));
    -
    -        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
    -    }
    -
    -
    +    /**
    --- End diff --
    
    it is, that is why it is red.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---