You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by peterj99a <gi...@git.apache.org> on 2017/11/11 18:37:55 UTC
[GitHub] usergrid pull request #584: Pj direct 2 es
GitHub user peterj99a opened a pull request:
https://github.com/apache/usergrid/pull/584
Pj direct 2 es
Add setting to by pass AWS when indexing
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/peterj99a/usergrid pj_direct_2_es
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/usergrid/pull/584.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #584
----
commit 7139781518869529665a528f97215a7301074adc
Author: Peter Johnson <pj...@apigee.com>
Date: 2017-11-02T18:46:27Z
Allow index requests to be sent directly to ES
commit 8b798b9a8fc9c26ffe2562bd7032efeaf01f51ab
Author: Peter Johnson <pj...@apigee.com>
Date: 2017-11-07T20:29:50Z
Add option to include old version in result
Add debug options
Add gzip support
----
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150619660
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/index/settings/QueueIndexingStrategy.java ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index.settings;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the paths an index request can take
+ * between tomcat and ES.
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum QueueIndexingStrategy {
+
+ NOINDEX("noindex"), // Do not Index the entity (DEBUG only use for testing)
--- End diff --
done in the correct version of the class
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150619942
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/IndexConsistency.java ---
@@ -0,0 +1,65 @@
+/*
--- End diff --
removed
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150618153
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Bufferes events and dispatched then in batches.
+ * Ensures that the callback will be called at a min interval.
+ */
+public class BufferedQueueImpl<T> implements BufferedQueue<T> {
+
+ private String fileName = "my_file_name.txt";
--- End diff --
done
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150599818
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---
@@ -217,6 +221,11 @@ private boolean nestedFieldCheck( String[] parts, Map<String, Field> fieldMap) {
}
+
+ static {
--- End diff --
remove?
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150597889
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.settings.QueueIndexingStrategy;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the AsyncEventService that writes first directly to ES
+ * and then submits to ASW as a backup.
+ *
+ * Created by peterajohnson on 8/29/17.
+ */
+public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
+
+ private boolean indexDebugMode = false;
+ private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
+
+ private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
+
+ public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
+ super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
+
+ //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
+ bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
+
+ configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
+
+ indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
+
+ }
+
+ protected void dispatchToES(final List<Serializable> bodies) {
+
+ List<LegacyQueueMessage> messages = new ArrayList<>();
+ for (Serializable body : bodies) {
+ String uuid = UUID.randomUUID().toString();
+ LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
--- End diff --
type looks weird
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150589242
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---
@@ -704,7 +707,24 @@ public void queueIndexOperationMessage(final IndexOperationMessage indexOperatio
offerTopic( elasticsearchIndexEvent, queueType );
}
- private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
+ protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
--- End diff --
Should this be named getIndexEvent or something similar? index operation message is the input
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150624265
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.settings.QueueIndexingStrategy;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the AsyncEventService that writes first directly to ES
+ * and then submits to ASW as a backup.
+ *
+ * Created by peterajohnson on 8/29/17.
+ */
+public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
+
+ private boolean indexDebugMode = false;
+ private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
+
+ private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
+
+ public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
+ super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
+
+ //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
+ bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
+
+ configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
+
+ indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
+
+ }
+
+ protected void dispatchToES(final List<Serializable> bodies) {
+
+ List<LegacyQueueMessage> messages = new ArrayList<>();
+ for (Serializable body : bodies) {
+ String uuid = UUID.randomUUID().toString();
+ LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
--- End diff --
ok, missed that, sorry
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150607030
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/QueueIndexingStrategy.java ---
@@ -0,0 +1,80 @@
+/*
--- End diff --
.../settigs/QueueIndexingStrategy -- this is a duplicate file
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150592038
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Bufferes events and dispatched then in batches.
+ * Ensures that the callback will be called at a min interval.
+ */
+public class BufferedQueueImpl<T> implements BufferedQueue<T> {
+
+ private String fileName = "my_file_name.txt";
+ private Consumer<List<T>> consumer;
+
+ ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
--- End diff --
add comment if implementation requires only one thread be used
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/usergrid/pull/584
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150598885
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/index/settings/QueueIndexingStrategy.java ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index.settings;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the paths an index request can take
+ * between tomcat and ES.
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum QueueIndexingStrategy {
+
+ NOINDEX("noindex"), // Do not Index the entity (DEBUG only use for testing)
--- End diff --
change to debug_noindex and debug_directonly
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150607146
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/IndexConsistency.java ---
@@ -0,0 +1,65 @@
+/*
--- End diff --
.../settigs/IndexConsistency -- this is a duplicate file
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150618787
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Bufferes events and dispatched then in batches.
+ * Ensures that the callback will be called at a min interval.
+ */
+public class BufferedQueueImpl<T> implements BufferedQueue<T> {
+
+ private String fileName = "my_file_name.txt";
+ private Consumer<List<T>> consumer;
+
+ ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
--- End diff --
removed
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150618613
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---
@@ -704,7 +707,24 @@ public void queueIndexOperationMessage(final IndexOperationMessage indexOperatio
offerTopic( elasticsearchIndexEvent, queueType );
}
- private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
+ protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
--- End diff --
done
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150601310
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java ---
@@ -58,8 +59,12 @@ public CollectionServiceImpl( final PipelineBuilderFactory pipelineBuilderFactor
final Optional<String> query = search.getQuery();
final IdBuilder pipelineBuilder =
- pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
- .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+ pipelineBuilderFactory.create( applicationScope )
+ .withCursor( search.getCursor() )
+ .withLimit( search.getLimit() )
+ .keepStaleEntries(search.getKeepStaleEntries())
+ .query(search.getQuery())
--- End diff --
.query(query)
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150619585
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.settings.QueueIndexingStrategy;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the AsyncEventService that writes first directly to ES
+ * and then submits to ASW as a backup.
+ *
+ * Created by peterajohnson on 8/29/17.
+ */
+public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
+
+ private boolean indexDebugMode = false;
+ private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
+
+ private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
+
+ public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
+ super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
+
+ //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
+ bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
+
+ configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
+
+ indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
+
+ }
+
+ protected void dispatchToES(final List<Serializable> bodies) {
+
+ List<LegacyQueueMessage> messages = new ArrayList<>();
+ for (Serializable body : bodies) {
+ String uuid = UUID.randomUUID().toString();
+ LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
--- End diff --
Yeah the "put type here" was in the original implementation. I just copied it over to keep it the same
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150619965
--- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settigs/QueueIndexingStrategy.java ---
@@ -0,0 +1,80 @@
+/*
--- End diff --
removed
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150589335
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---
@@ -216,6 +214,10 @@ public Long getValue() {
start();
}
+ protected Histogram getMessageCycye() {
--- End diff --
typo (getMessageCycle)
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150594658
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by peterajohnson on 10/27/17.
+ */
--- End diff --
Probably a one-line description of the class (non-buffered implementation of BufferedQueue) would be useful. Do you think this class makes more sense than modifying BufferedQueue to have this implementation if interval = 0? Then we could switch between buffered and non-buffered using config values.
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by mdunker <gi...@git.apache.org>.
Github user mdunker commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150590909
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Bufferes events and dispatched then in batches.
+ * Ensures that the callback will be called at a min interval.
+ */
+public class BufferedQueueImpl<T> implements BufferedQueue<T> {
+
+ private String fileName = "my_file_name.txt";
--- End diff --
delete this?
---
[GitHub] usergrid pull request #584: Pj direct 2 es
Posted by peterj99a <gi...@git.apache.org>.
Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/584#discussion_r150619814
--- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---
@@ -217,6 +221,11 @@ private boolean nestedFieldCheck( String[] parts, Map<String, Field> fieldMap) {
}
+
+ static {
--- End diff --
yeah gone
---