You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by peterj99a <gi...@git.apache.org> on 2017/11/11 18:37:55 UTC

[GitHub] usergrid pull request #584: Pj direct 2 es

GitHub user peterj99a opened a pull request:

    https://github.com/apache/usergrid/pull/584

    Pj direct 2 es

    Add setting to by pass AWS when indexing

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/peterj99a/usergrid pj_direct_2_es

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/usergrid/pull/584.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #584
    
----
commit 7139781518869529665a528f97215a7301074adc
Author: Peter Johnson <pj...@apigee.com>
Date:   2017-11-02T18:46:27Z

    Allow index requests to be sent directly to ES

commit 8b798b9a8fc9c26ffe2562bd7032efeaf01f51ab
Author: Peter Johnson <pj...@apigee.com>
Date:   2017-11-07T20:29:50Z

    Add option to include old version in result
    Add debug options
    Add gzip support

----


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150619660
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/index/settings/QueueIndexingStrategy.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.index.settings;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * This class describes the paths an index request can take
    + * between tomcat and ES.
    + *
    + * Created by peterajohnson on 10/30/17.
    + */
    +public enum QueueIndexingStrategy {
    +
    +    NOINDEX("noindex"),        // Do not Index the entity (DEBUG only use for testing)
    --- End diff --
    
    done in the correct version of the class


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150619942
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/IndexConsistency.java ---
    @@ -0,0 +1,65 @@
    +/*
    --- End diff --
    
    removed


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150618153
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.*;
    +import java.util.function.Consumer;
    +
    +/**
    + * Bufferes events and dispatched then in batches.
    + * Ensures that the callback will be called at a min interval.
    + */
    +public class BufferedQueueImpl<T> implements BufferedQueue<T> {
    +
    +    private String fileName = "my_file_name.txt";
    --- End diff --
    
    done


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150599818
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---
    @@ -217,6 +221,11 @@ private boolean nestedFieldCheck( String[] parts, Map<String, Field> fieldMap) {
         }
     
     
    +
    +    static {
    --- End diff --
    
    remove?


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150597889
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
    +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
    +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
    +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
    +import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
    +import org.apache.usergrid.corepersistence.index.settings.QueueIndexingStrategy;
    +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
    +import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
    +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
    +import org.apache.usergrid.persistence.index.EntityIndexFactory;
    +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
    +import org.apache.usergrid.persistence.index.impl.IndexProducer;
    +import org.apache.usergrid.persistence.map.MapManagerFactory;
    +import org.apache.usergrid.persistence.queue.LegacyQueueFig;
    +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
    +import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.UUID;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Implementation of the AsyncEventService that writes first directly to ES
    + * and then submits to ASW as a backup.
    + *
    + * Created by peterajohnson on 8/29/17.
    + */
    +public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
    +
    +
    +    private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
    +
    +    private boolean indexDebugMode = false;
    +    private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
    +
    +    private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
    +
    +    public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
    +        super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
    +
    +        //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
    +        bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
    +
    +        configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
    +
    +        indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
    +
    +    }
    +
    +    protected void dispatchToES(final List<Serializable> bodies) {
    +
    +        List<LegacyQueueMessage> messages = new ArrayList<>();
    +        for (Serializable body : bodies) {
    +            String uuid = UUID.randomUUID().toString();
    +            LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
    --- End diff --
    
    type looks weird


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150589242
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---
    @@ -704,7 +707,24 @@ public void queueIndexOperationMessage(final IndexOperationMessage indexOperatio
             offerTopic( elasticsearchIndexEvent, queueType );
         }
     
    -    private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
    +    protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
    --- End diff --
    
    Should this be named getIndexEvent or something similar? index operation message is the input


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150624265
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
    +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
    +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
    +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
    +import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
    +import org.apache.usergrid.corepersistence.index.settings.QueueIndexingStrategy;
    +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
    +import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
    +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
    +import org.apache.usergrid.persistence.index.EntityIndexFactory;
    +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
    +import org.apache.usergrid.persistence.index.impl.IndexProducer;
    +import org.apache.usergrid.persistence.map.MapManagerFactory;
    +import org.apache.usergrid.persistence.queue.LegacyQueueFig;
    +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
    +import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.UUID;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Implementation of the AsyncEventService that writes first directly to ES
    + * and then submits to ASW as a backup.
    + *
    + * Created by peterajohnson on 8/29/17.
    + */
    +public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
    +
    +
    +    private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
    +
    +    private boolean indexDebugMode = false;
    +    private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
    +
    +    private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
    +
    +    public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
    +        super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
    +
    +        //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
    +        bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
    +
    +        configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
    +
    +        indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
    +
    +    }
    +
    +    protected void dispatchToES(final List<Serializable> bodies) {
    +
    +        List<LegacyQueueMessage> messages = new ArrayList<>();
    +        for (Serializable body : bodies) {
    +            String uuid = UUID.randomUUID().toString();
    +            LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
    --- End diff --
    
    ok, missed that, sorry


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150607030
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/QueueIndexingStrategy.java ---
    @@ -0,0 +1,80 @@
    +/*
    --- End diff --
    
    .../settigs/QueueIndexingStrategy -- this is a duplicate file


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150592038
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.*;
    +import java.util.function.Consumer;
    +
    +/**
    + * Bufferes events and dispatched then in batches.
    + * Ensures that the callback will be called at a min interval.
    + */
    +public class BufferedQueueImpl<T> implements BufferedQueue<T> {
    +
    +    private String fileName = "my_file_name.txt";
    +    private Consumer<List<T>> consumer;
    +
    +    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
    --- End diff --
    
    add comment if implementation requires only one thread be used


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/usergrid/pull/584


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150598885
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/index/settings/QueueIndexingStrategy.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.index.settings;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * This class describes the paths an index request can take
    + * between tomcat and ES.
    + *
    + * Created by peterajohnson on 10/30/17.
    + */
    +public enum QueueIndexingStrategy {
    +
    +    NOINDEX("noindex"),        // Do not Index the entity (DEBUG only use for testing)
    --- End diff --
    
    change to debug_noindex and debug_directonly


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150607146
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/IndexConsistency.java ---
    @@ -0,0 +1,65 @@
    +/*
    --- End diff --
    
    .../settigs/IndexConsistency -- this is a duplicate file


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150618787
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.*;
    +import java.util.function.Consumer;
    +
    +/**
    + * Bufferes events and dispatched then in batches.
    + * Ensures that the callback will be called at a min interval.
    + */
    +public class BufferedQueueImpl<T> implements BufferedQueue<T> {
    +
    +    private String fileName = "my_file_name.txt";
    +    private Consumer<List<T>> consumer;
    +
    +    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
    --- End diff --
    
    removed


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150618613
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---
    @@ -704,7 +707,24 @@ public void queueIndexOperationMessage(final IndexOperationMessage indexOperatio
             offerTopic( elasticsearchIndexEvent, queueType );
         }
     
    -    private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
    +    protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
    --- End diff --
    
    done


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150601310
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java ---
    @@ -58,8 +59,12 @@ public CollectionServiceImpl( final PipelineBuilderFactory pipelineBuilderFactor
             final Optional<String> query = search.getQuery();
     
             final IdBuilder pipelineBuilder =
    -            pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
    -                                  .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
    +            pipelineBuilderFactory.create( applicationScope )
    +                .withCursor( search.getCursor() )
    +                .withLimit( search.getLimit() )
    +                .keepStaleEntries(search.getKeepStaleEntries())
    +                .query(search.getQuery())
    --- End diff --
    
    .query(query)


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150619585
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
    +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
    +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
    +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
    +import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
    +import org.apache.usergrid.corepersistence.index.settings.QueueIndexingStrategy;
    +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
    +import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
    +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
    +import org.apache.usergrid.persistence.index.EntityIndexFactory;
    +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
    +import org.apache.usergrid.persistence.index.impl.IndexProducer;
    +import org.apache.usergrid.persistence.map.MapManagerFactory;
    +import org.apache.usergrid.persistence.queue.LegacyQueueFig;
    +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
    +import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.UUID;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Implementation of the AsyncEventService that writes first directly to ES
    + * and then submits to ASW as a backup.
    + *
    + * Created by peterajohnson on 8/29/17.
    + */
    +public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
    +
    +
    +    private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
    +
    +    private boolean indexDebugMode = false;
    +    private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
    +
    +    private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
    +
    +    public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
    +        super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
    +
    +        //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
    +        bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
    +
    +        configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
    +
    +        indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
    +
    +    }
    +
    +    protected void dispatchToES(final List<Serializable> bodies) {
    +
    +        List<LegacyQueueMessage> messages = new ArrayList<>();
    +        for (Serializable body : bodies) {
    +            String uuid = UUID.randomUUID().toString();
    +            LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
    --- End diff --
    
    Yeah the "put type here" was in the original implementation.  I just copied it over to keep it the same


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150619965
  
    --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/QueueIndexingStrategy.java ---
    @@ -0,0 +1,80 @@
    +/*
    --- End diff --
    
    removed


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150589335
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---
    @@ -216,6 +214,10 @@ public Long getValue() {
             start();
         }
     
    +    protected Histogram getMessageCycye() {
    --- End diff --
    
    typo (getMessageCycle)


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150594658
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +import java.util.function.Consumer;
    +
    +/**
    + * Created by peterajohnson on 10/27/17.
    + */
    --- End diff --
    
    Probably a one-line description of the class (non-buffered implementation of BufferedQueue) would be useful. Do you think this class makes more sense than modifying BufferedQueue to have this implementation if interval = 0? Then we could switch between buffered and non-buffered using config values.


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150590909
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.usergrid.corepersistence.asyncevents.direct;
    +
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.*;
    +import java.util.function.Consumer;
    +
    +/**
    + * Bufferes events and dispatched then in batches.
    + * Ensures that the callback will be called at a min interval.
    + */
    +public class BufferedQueueImpl<T> implements BufferedQueue<T> {
    +
    +    private String fileName = "my_file_name.txt";
    --- End diff --
    
    delete this?


---

[GitHub] usergrid pull request #584: Pj direct 2 es

Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:

    https://github.com/apache/usergrid/pull/584#discussion_r150619814
  
    --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---
    @@ -217,6 +221,11 @@ private boolean nestedFieldCheck( String[] parts, Map<String, Field> fieldMap) {
         }
     
     
    +
    +    static {
    --- End diff --
    
    yeah gone


---