You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/31 00:32:42 UTC
[08/50] [abbrv] usergrid git commit: Removed unused class and added
logging
Removed unused class and added logging
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b4faf72
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b4faf72
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b4faf72
Branch: refs/heads/master
Commit: 8b4faf72221ec9eb2b8bb59cca0263c20b483ccb
Parents: db852d0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 09:58:45 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 09:58:45 2015 -0600
----------------------------------------------------------------------
.../read/traverse/AbstractReadGraphFilter.java | 10 ++-
.../read/traverse/EntityLoadVerifyFilter.java | 24 +++---
.../persistence/ObservableIterator.java | 83 --------------------
.../persistence/core/rx/ObservableIterator.java | 2 +-
4 files changed, 24 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index f477092..d3e0345 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,9 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -41,6 +44,8 @@ import rx.Observable;
*/
public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
+ private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
+
private final GraphManagerFactory graphManagerFactory;
@@ -82,7 +87,10 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
*/
return graphManager.loadEdgesFromSource( search )
//set the edge state for cursors
- .doOnNext( edge -> edgeCursorState.update( edge ) )
+ .doOnNext( edge -> {
+ logger.trace( "Seeking over edge {}", edge );
+ edgeCursorState.update( edge );
+ } )
//map our id from the target edge and set our cursor every edge we traverse
.map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
index 41507e9..c782bce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -47,7 +47,9 @@ import rx.Observable;
*
* TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
*/
-public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>>{
+public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>> {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityLoadVerifyFilter.class );
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -69,17 +71,19 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil
final Observable<FilterResult<Entity>> entityObservable =
filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
- final Observable<EntitySet> entitySetObservable =
- Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
- .flatMap( ids -> entityCollectionManager.load( ids ) );
+ logger.trace( "Attempting to batch load ids {}", bufferedIds );
+
+ final Observable<EntitySet> entitySetObservable =
+ Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+ .flatMap( ids -> entityCollectionManager.load( ids ) );
- //now we have a collection, validate our canidate set is correct.
+ //now we have a collection, validate our canidate set is correct.
- return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
- .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
- entityCollector -> Observable.from( entityCollector.getResults() ) );
- } );
+ return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+ .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
return entityObservable;
}
@@ -132,7 +136,7 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil
//doesn't exist warn and drop
if ( entity == null || !entity.getEntity().isPresent() ) {
logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
- + " Ignoring since this could be a region sync issue", candidateId );
+ + " Ignoring since this could be a region sync issue", candidateId );
//TODO trigger an audit after a fail count where we explicitly try to repair from other regions
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
deleted file mode 100644
index 9befb79..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Subscriber;
-
-import java.util.Iterator;
-
-
-/**
- * Converts an iterator to an observable. Subclasses need to only implement getting the iterator from the data source.
- * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O. This allows us
- * to wrap the iterator in a deferred invocation to avoid the blocking on construction.
- */
-public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
-
- private static final Logger log = LoggerFactory.getLogger(ObservableIterator.class);
-
- private final String name;
-
-
- /**
- * @param name The simple name of the iterator, used for debugging
- */
- protected ObservableIterator( final String name ) {this.name = name;}
-
-
- @Override
- public void call( final Subscriber<? super T> subscriber ) {
-
-
- try {
- //get our iterator and push data to the observer
- final Iterator<T> itr = getIterator();
-
- Preconditions.checkNotNull(itr,
- "The observable must return an iterator. Null was returned for iterator " + name);
-
-
- //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
- while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
- final T next = itr.next();
-
- log.trace( "Iterator '{}' emitting item '{}'", name, next );
-
- subscriber.onNext( next );
- }
-
-
- subscriber.onCompleted();
- }
-
- //if any error occurs, we need to notify the observer so it can perform it's own error handling
- catch ( Throwable t ) {
- log.error( "Unable to emit items from iterator {}", name, t );
- subscriber.onError( t );
- }
- }
-
-
- /**
- * Return the iterator to feed data to
- */
- protected abstract Iterator<T> getIterator();
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 84a7fc3..57409e1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -66,7 +66,7 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
final T next = itr.next();
-// log.trace( "Iterator '{}' emitting item '{}'", name, next );
+ log.trace( "Iterator '{}' emitting item '{}'", name, next );
subscriber.onNext( next );
}