You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/21 02:11:00 UTC

incubator-usergrid git commit: WIP and a mess, squash later.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-494 4c59f0922 -> c4f654847


WIP and a mess, squash later.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c4f65484
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c4f65484
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c4f65484

Branch: refs/heads/USERGRID-494
Commit: c4f654847b8041a94176996feffa4060dd638819
Parents: 4c59f09
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 20 19:10:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 20 19:10:58 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/io/read/CommandBuilder.java |  36 +++---
 .../io/read/EntityIndexCommands.java            | 119 +++++++++++++++++++
 .../corepersistence/io/state/CursorCache.java   |  34 ++++++
 .../corepersistence/rx/impl/CollectUntil.java   |  60 ++++++++++
 .../java/org/apache/usergrid/TempExample.java   |  74 ++++++++++++
 .../rx/impl/CollectUntilTest.java               |  62 ++++++++++
 6 files changed, 368 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
index 119fc0e..828cbc9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.usergrid.corepersistence.io.reduce.StreamReducer;
+import org.apache.usergrid.corepersistence.io.state.CursorCache;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -35,31 +36,32 @@ import rx.functions.Func1;
  */
 public class CommandBuilder {
 
-    private final Id root;
-    private final List<Command<Id>> commandList;
 
+    private CursorCache cache;
+    private final Observable<Id> pathObservable;
 
-    public CommandBuilder( final Id root ) {this.root = root;
-        commandList = new ArrayList<>(  );
-    }
 
-    public void addIntermediateCommand(final Command<Id> command){
-      commandList.add( command );
+    public CommandBuilder( final Id root ) {
+       pathObservable = Observable.just( root );
     }
 
 
-    public <T> void addFinalCommand( final Command<T> command, final StreamReducer<T> reducer ) {
-
-        Observable.just("foo").flatMap( new Func1<String, Observable<?>>() {
-            @Override
-            public Observable<?> call( final String s ) {
-                return null;
-            }
-        };
+    /**
+     * Set our cache
+     * @param cache
+     */
+    public void setCache(final CursorCache cache){
+        this.cache = cache;
     }
 
 
-    public List<Command<Id>> getCommands(){
-        return  commandList;
+    /**
+     * Returns the observable that contains the current traversal operations
+     * @return
+     */
+    public Observable<Id> getPathObservable(){
+        return pathObservable;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
new file mode 100644
index 0000000..d6f4e93
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.io.read;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Action2;
+import rx.functions.Func1;
+
+
+public class EntityIndexCommands {
+
+
+//    /**
+//     * Perform a search of all the entities, and then return the observable of search results
+//     * @param index
+//     * @param edgeType
+//     * @param types
+//     * @param query
+//     * @return
+//     */
+//    public static Func1<Id, SearchResults> searchEntities(final ApplicationEntityIndex index,final String edgeType, final SearchTypes types, final String query  ){
+//
+//        return nodeId -> {
+//
+//        }
+//    }
+    /**
+     * Construct an indexScope from the input id type
+     * @param type
+     * @return
+     */
+   public static Func1<Id, IndexScope> createSearchScope(final String type){
+       return id -> new IndexScopeImpl( id, type );
+   }
+    /**
+     * Get our candidate results
+     * @param index
+     * @param types The types to return
+     * @param query
+     * @return
+     */
+    public static Func1<IndexScope, CandidateResults> getCandidates(final ApplicationEntityIndex index, final SearchTypes types, final String query){
+        return indexScope -> index.search( indexScope, types, Query.fromQLNullSafe( query ) );
+    }
+
+
+    /**
+     * Flattens candidate results into a single stream of a result
+     * @return
+     */
+    public static Func1<CandidateResults, Observable<CandidateResult>> flattenCandidates(){
+        return (CandidateResults candidateResults) -> Observable.from( candidateResults );
+    }
+
+
+    public static Action2<SearchResults, EntitySet> collectSet(){
+        return (searchResults, entitySet) -> {
+          searchResults.addEntities( entitySet.entities );
+        };
+    }
+
+
+
+    public static class SearchResults{
+        private final List<Entity> entities;
+        private String cursor;
+
+
+        public SearchResults(final int maxSize) {entities = new ArrayList<>(maxSize);}
+
+        public void addEntities(final Collection<Entity> entities){
+            this.entities.addAll( entities );
+
+        }
+        public void setCursor(final String cursor){
+            this.cursor = cursor;
+        }
+    }
+
+    public static class EntitySet{
+        private final List<Entity> entities;
+
+
+        public EntitySet( final List<Entity> entities ) {this.entities = entities;}
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java
new file mode 100644
index 0000000..8a5ca49
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/state/CursorCache.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.io.state;
+
+
+/**
+ * A class that represents a cursor cache
+ */
+public class CursorCache {
+
+    public CursorCache(){
+
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
new file mode 100644
index 0000000..a2cb754
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import rx.Observable;
+import rx.functions.Action2;
+import rx.functions.Func0;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.internal.operators.OperatorScan;
+
+
+/**
+ * An operation for performing a collect until the predicate returns true
+ */
+public class CollectUntil<T, R> implements Observable.Transformer<T, R> {
+
+    final Func0<R> stateFactory;
+    final Action2<R, ? super T> collector;
+    final Func1<R, Boolean> predicate;
+
+
+    public CollectUntil( final Func0<R> stateFactory, final Action2<R, ? super T> collector,
+                          final Func1<R, Boolean> predicate ) {
+        this.stateFactory = stateFactory;
+        this.collector = collector;
+        this.predicate = predicate;
+    }
+
+
+    @Override
+    public Observable<R> call( final Observable<T> tObservable ) {
+        Func2<R, T, R> accumulator = ( state, value ) -> {
+            collector.call( state, value );
+            return state;
+        };
+
+
+        return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( predicate );
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/test/java/org/apache/usergrid/TempExample.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TempExample.java b/stack/core/src/test/java/org/apache/usergrid/TempExample.java
new file mode 100644
index 0000000..1db450d
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/TempExample.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid;
+
+
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.createSearchScope;
+import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.getCandidates;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+
+
+public class TempExample {
+
+    //set our root observable
+
+
+    public static void main(String[] args) {
+
+        final Id rootId = createId( "thing" );
+
+        final ApplicationEntityIndex index = null;
+
+
+        final SearchTypes searchType = SearchTypes.fromTypes( "test" );
+
+        final String query = "select * ";
+
+        final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) ).map(getCandidates(index, searchType, query));
+
+
+        observable.doOnNext( a -> System.out.println( a) ).toBlocking().last();
+    }
+
+
+    private static final class ResultsCollector{
+
+        /**
+         * Add the candidates to our collection
+         * @param results
+         */
+        public void addCandidates(final CandidateResults results ){
+
+            //TODO, collect the results, removing groups
+            Observable.from( results ).groupBy( candidate -> candidate.getId() ).collect(  )
+        }
+
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4f65484/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
new file mode 100644
index 0000000..ce12429
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.junit.Test;
+
+import rx.Observable;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class CollectUntilTest {
+
+    @Test
+    public void testCollectUntil() {
+
+        final CollectUntil<Integer, CountCollector> collectUntil =
+            new CollectUntil<>(
+                () -> new CountCollector(),
+                ( collector, value ) -> collector.mark(),
+                collector -> collector.isFull() );
+
+
+        final CountCollector collector = Observable.range( 0, 200 ).compose( collectUntil ).toBlocking().last();
+
+        assertEquals( 100, collector.count );
+    }
+
+
+    private static final class CountCollector {
+
+        private int count;
+
+
+        public void mark() {
+            count++;
+        }
+
+
+        public boolean isFull() {
+            return count == 100;
+        }
+    }
+}