You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/02 07:44:11 UTC
[01/13] git commit: Beginnings of a rebuildable index implementation
and associated test.
Repository: incubator-usergrid
Updated Branches:
refs/heads/esbatching bba08ddc1 -> c051fdaec
Beginnings of a rebuildable index implementation and associated test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6a6171e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6a6171e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6a6171e8
Branch: refs/heads/esbatching
Commit: 6a6171e804dd7fafd85039c63401e84ca53ac8d4
Parents: 4cba3b8
Author: Dave Johnson <dm...@apigee.com>
Authored: Sat Sep 27 10:04:29 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Sat Sep 27 10:04:29 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 6 +-
.../corepersistence/CpEntityManagerFactory.java | 71 +++++-----
.../HybridEntityManagerFactory.java | 9 +-
.../persistence/EntityManagerFactory.java | 9 +-
.../cassandra/EntityManagerFactoryImpl.java | 12 +-
.../PerformanceEntityRebuildIndexTest.java | 129 +++++++++++++++++++
.../cassandra/EntityManagerFactoryImplIT.java | 8 +-
stack/core/src/test/resources/log4j.properties | 13 +-
.../usergrid/persistence/index/IndexFig.java | 7 +
.../persistence/index/impl/EsProvider.java | 15 +++
.../org/apache/usergrid/tools/IndexRebuild.java | 6 +-
stack/tools/src/main/resources/log4j.properties | 18 ++-
12 files changed, 246 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 5a5e6bc..51e660b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -211,19 +211,19 @@ public class CpEntityManager implements EntityManager {
static String getCollectionScopeNameFromEntityType( String type ) {
String csn = Schema.defaultCollectionName( type ) + COLL_SUFFIX;
- return csn;
+ return csn.toLowerCase();
}
static String getCollectionScopeNameFromCollectionName( String name ) {
String csn = name + COLL_SUFFIX;
- return csn;
+ return csn.toLowerCase();
}
static String getConnectionScopeName( String entityType, String connectionType ) {
String csn = connectionType + entityType + CONN_SUFFIX;
- return csn;
+ return csn.toLowerCase();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 3e9a6a9..491a47b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -48,7 +48,9 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
+import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexScope;
@@ -69,6 +71,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
+import rx.Observable;
/**
@@ -581,32 +584,38 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
- public void rebuildInternalIndexes() throws Exception {
+ public void rebuildInternalIndexes( ProgressObserver po ) throws Exception {
+
+ // get all connections from systems app
+// GraphManager gm = managerCache.getGraphManager( CpEntityManagerFactory.SYSTEM_APPS_SCOPE );
+//
+// Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
+// new SimpleSearchEdgeType( systemAppId, null , null ));
logger.info("Rebuilding system apps index");
rebuildIndexScope(
CpEntityManagerFactory.SYSTEM_APPS_SCOPE,
- CpEntityManagerFactory.SYSTEM_APPS_INDEX_SCOPE );
+ CpEntityManagerFactory.SYSTEM_APPS_INDEX_SCOPE, po );
logger.info("Rebuilding system orgs index");
rebuildIndexScope(
CpEntityManagerFactory.SYSTEM_ORGS_SCOPE,
- CpEntityManagerFactory.SYSTEM_ORGS_INDEX_SCOPE );
+ CpEntityManagerFactory.SYSTEM_ORGS_INDEX_SCOPE, po );
logger.info("Rebuilding system props index");
rebuildIndexScope(
CpEntityManagerFactory.SYSTEM_PROPS_SCOPE,
- CpEntityManagerFactory.SYSTEM_PROPS_INDEX_SCOPE );
+ CpEntityManagerFactory.SYSTEM_PROPS_INDEX_SCOPE, po );
logger.info("Rebuilding management application index");
- rebuildApplicationIndex( MANAGEMENT_APPLICATION_ID );
+ rebuildApplicationIndex( MANAGEMENT_APPLICATION_ID, po );
logger.info("Rebuilding default application index");
- rebuildApplicationIndex( DEFAULT_APPLICATION_ID );
+ rebuildApplicationIndex( DEFAULT_APPLICATION_ID, po );
}
- private void rebuildIndexScope( CollectionScope cs, IndexScope is ) {
+ private void rebuildIndexScope( CollectionScope cs, IndexScope is, ProgressObserver po ) {
logger.info("Rebuild index scope for {}:{}:{}", new Object[] {
cs.getOwner(), cs.getApplication(), cs.getName()
@@ -618,38 +627,37 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
Query q = Query.fromQL("select *");
CandidateResults results = ei.search( q );
- Map<String, UUID> appMap = new HashMap<String, UUID>();
-
Iterator<CandidateResult> iter = results.iterator();
while (iter.hasNext()) {
CandidateResult cr = iter.next();
Entity entity = ecm.load(cr.getId()).toBlockingObservable().last();
- if (cr.getVersion().compareTo( entity.getVersion()) < 0 ) {
- logger.warn(" Ignoring stale version uuid:{} type:{} version:{} latest version:{}",
- new Object[]{
- cr.getId().getUuid(),
- cr.getId().getType(),
- cr.getVersion(),
- entity.getVersion()
+ if ( cr.getVersion().compareTo( entity.getVersion()) < 0 ) {
+ logger.warn(" Ignoring stale version uuid:{} type:{} state v:{} latest v:{}",
+ new Object[] {
+ cr.getId().getUuid(), cr.getId().getType(),
+ cr.getVersion(), entity.getVersion()
});
- continue;
- }
- logger.info(" Updating CP Entity type: {} with id: {} for app id: {}",
- new Object[]{cr.getId().getType(), cr.getId().getUuid(),
- CpEntityManagerFactory.SYSTEM_APPS_SCOPE.getApplication().getUuid()
+ } else {
+
+ logger.info(" Updating entity type {} with id {} for app {}/{}", new Object[] {
+ cr.getId().getType(), cr.getId().getUuid(), cs.getApplication().getUuid()
+ });
+
+ ei.index(entity);
+
+ if ( po != null ) {
+ po.onProgress();
}
- );
- ei.index(entity);
+ }
}
-
}
- public void rebuildApplicationIndex( UUID appId ) throws Exception {
+ public void rebuildApplicationIndex( UUID appId, ProgressObserver po ) throws Exception {
EntityManager em = getEntityManager( appId );
@@ -659,12 +667,13 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
appId, collections.size(), collections });
for ( String collection : collections ) {
- rebuildCollectionIndex( appId, collection );
+ rebuildCollectionIndex( appId, collection, po );
}
}
- public void rebuildCollectionIndex( UUID appId, String collectionName ) throws Exception {
+ public void rebuildCollectionIndex( UUID appId, String collectionName, ProgressObserver po )
+ throws Exception {
logger.info( "Reindexing collection: {} for app id: {}", collectionName, appId );
@@ -683,15 +692,19 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
for ( org.apache.usergrid.persistence.Entity entity : r.getEntities() ) {
- logger.info( " Updating Entity name {}, type: {}, id: {} in app id: {}", new Object[] {
+ logger.info( " Updating Entity name {}, type: {}, id: {} in app id: {}", new Object[] {
entity.getName(), entity.getType(), entity.getUuid(), appId
} );
try {
em.update( entity );
+
+ if ( po != null ) {
+ po.onProgress();
+ }
}
catch ( DuplicateUniquePropertyExistsException dupee ) {
- logger.error( "Duplicate property for type: {} with id: {} for app id: {}. "
+ logger.error( " Duplicate property for type: {} with id: {} for app id: {}. "
+ "Property name: {} , value: {}", new Object[] {
entity.getType(), entity.getUuid(), appId, dupee.getPropertyName(),
dupee.getPropertyValue()
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
index 3b2fd3e..8002199 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
@@ -145,12 +145,13 @@ public class HybridEntityManagerFactory implements EntityManagerFactory, Applica
}
@Override
- public void rebuildInternalIndexes() throws Exception {
- factory.rebuildInternalIndexes();
+ public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
+ factory.rebuildInternalIndexes(po);
}
@Override
- public void rebuildCollectionIndex(UUID appId, String collectionName) throws Exception {
- factory.rebuildCollectionIndex(appId, collectionName);
+ public void rebuildCollectionIndex(UUID appId, String collectionName, ProgressObserver po)
+ throws Exception {
+ factory.rebuildCollectionIndex(appId, collectionName, po);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index 15e4dbe..bf4e47e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -117,12 +117,17 @@ public interface EntityManagerFactory {
public void refreshIndex();
- public void rebuildInternalIndexes() throws Exception;
+ public void rebuildInternalIndexes( ProgressObserver po ) throws Exception;
- public void rebuildCollectionIndex( UUID appId, String collectionName ) throws Exception;
+ public void rebuildCollectionIndex(
+ UUID appId, String collectionName, ProgressObserver po ) throws Exception;
public void setApplicationContext(ApplicationContext ac);
/** For testing purposes */
public void flushEntityManagerCaches();
+
+ public interface ProgressObserver {
+ public void onProgress();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
index d1a98ec..89d8a07 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -423,12 +423,8 @@ public class EntityManagerFactoryImpl implements EntityManagerFactory, Applicati
}
@Override
- public void rebuildInternalIndexes() throws Exception {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public void rebuildCollectionIndex(UUID appId, String collectionName) throws Exception {
+ public void rebuildCollectionIndex(UUID appId, String collectionName, ProgressObserver po )
+ throws Exception {
logger.info( "Reindexing collection: {} for app id: {}", collectionName, appId );
@@ -468,4 +464,8 @@ public class EntityManagerFactoryImpl implements EntityManagerFactory, Applicati
}
+ @Override
+ public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
+ // no op
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
new file mode 100644
index 0000000..35d1d50
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.Application;
+import org.apache.usergrid.CoreApplication;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import java.util.ArrayList;
+import org.apache.usergrid.cassandra.Concurrent;
+
+
+//@RunWith(JukitoRunner.class)
+//@UseModules({ GuiceModule.class })
+@Concurrent()
+public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
+ private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
+
+ private static final MetricRegistry registry = new MetricRegistry();
+
+ private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
+
+ private static final long writeDelayMs = 7;
+ private static final long readDelayMs = 7;
+
+ @Rule
+ public Application app = new CoreApplication( setup );
+
+ private Slf4jReporter reporter;
+
+
+ @Before
+ public void startReporting() {
+
+ reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
+ .convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+
+ reporter.start( 10, TimeUnit.SECONDS );
+ }
+
+
+ @After
+ public void printReport() {
+ reporter.report();
+ reporter.stop();
+ }
+
+
+ @Test
+ public void rebuildIndex() throws Exception {
+
+ logger.info("Started rebuildIndex()");
+
+ final EntityManager em = app.getEntityManager();
+ final long stopTime = System.currentTimeMillis() + RUNTIME;
+ final Map<String, Object> entityMap = new HashMap<>();
+
+ entityMap.put( "key1", 1000 );
+ entityMap.put( "key2", 2000 );
+ entityMap.put( "key3", "Some value" );
+
+ List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+
+ int i = 0;
+ while ( System.currentTimeMillis() < stopTime ) {
+
+ entityMap.put( "key", i );
+ final Entity created = em.create("testType", entityMap );
+
+ entityRefs.add( new SimpleEntityRef( created.getType(), created.getUuid() ) );
+
+ i++;
+
+ if ( i % 1000 == 0 ) {
+ logger.debug("rebuildIndex() Created {} entities",i );
+ }
+ Thread.sleep( writeDelayMs );
+ }
+ logger.info("rebuildIndex() Created {} entities", i);
+
+ final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
+ final Meter meter = registry.meter( meterName );
+
+ EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ @Override
+ public void onProgress() {
+ meter.mark();
+ }
+ };
+
+ setup.getEmf().rebuildInternalIndexes( po );
+
+ setup.getEmf().rebuildCollectionIndex( app.getId(), "testTypes", po);
+
+ registry.remove( meterName );
+ logger.info("Finished rebuildIndex()");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index a21fea0..efd9dfb 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,8 +93,13 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
}
+ public void testRebuildIndexes() throws Exception {
+
+
+ }
+
+
@Test
- @Ignore("Fix this EntityManagerFactoryImplIT.testCreateAndGet:105->createApplication:90 ยป ApplicationAlreadyExists")
public void testCreateAndGet() throws Exception {
TraceTag traceTag = traceTagManager.create( "testCreateAndGet" );
traceTagManager.attach( traceTag );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties
index 51f6112..2aa9983 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -39,15 +39,22 @@ log4j.logger.org.apache.usergrid.rest.security.AllowAjaxFilter=WARN, stdout
log4j.logger.me.prettyprint.hector.api.beans.AbstractComposite=ERROR, stdout
log4j.logger.org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl=DEBUG, stdout
-#log4j.logger.org.apache.usergrid.persistence.PerformanceEntityReadTest=DEBUG
+log4j.logger.org.apache.usergrid.persistence.PerformanceEntityReadTest=DEBUG
+log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=DEBUG
#log4j.logger.org.apache.usergrid.persistence=INFO
-#log4j.logger.org.apache.usergrid.corepersistence=DEBUG
+
+log4j.logger.org.apache.usergrid.corepersistence=DEBUG
+#log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO
+#log4j.logger.org.apache.usergrid.corepersistence.CpEntityManagerFactory=DEBUG
+#log4j.logger.org.apache.usergrid.corepersistence.CpEntityManager=DEBUG
+#log4j.logger.org.apache.usergrid.corepersistence.CpRelationManager=DEBUG
+
#log4j.logger.com.netflix.hystrix=DEBUG
#log4j.logger.org.antlr=DEBUG
#log4j.logger.org.apache.usergrid.persistence.CollectionIT=DEBUG
-#log4j.logger.org.apache.usergrid.persistence.index=DEBUG
#log4j.logger.org.apache.usergrid.persistence.collection=DEBUG
+log4j.logger.org.apache.usergrid.persistence.index=DEBUG
#log4j.logger.org.elasticsearch=DEBUG
#log4j.logger.org.apache.cassandra.service.StorageProxy=DEBUG, stdout
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 5d59f61..142c48a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -33,6 +33,8 @@ public interface IndexFig extends GuicyFig {
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster_name";
+ public static final String ELASTICSEARCH_NODENAME = "elasticsearch.node_name";
+
public static final String ELASTICSEARCH_INDEX_PREFIX = "elasticsearch.index_prefix";
public static final String ELASTICSEARCH_STARTUP = "elasticsearch.startup";
@@ -75,4 +77,9 @@ public interface IndexFig extends GuicyFig {
@Default( "false" )
@Key( ELASTICSEARCH_FORCE_REFRESH )
public boolean isForcedRefresh();
+
+ /** Identify the client node with a unique name. */
+ @Default("default")
+ @Key( ELASTICSEARCH_NODENAME )
+ public String getNodeName();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
index 83a3eac..a9228ee 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
@@ -22,7 +22,10 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Properties;
+import java.util.logging.Level;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.core.util.AvailablePortFinder;
@@ -133,7 +136,18 @@ public class EsProvider {
// we will connect to forked ES on localhost
allHosts = "localhost:" + System.getProperty(LOCAL_ES_PORT_PROPNAME);
+ }
+
+ String nodeName = fig.getNodeName();
+ if ( "default".equals( nodeName )) {
+ // no nodeName was specified, use hostname
+ try {
+ nodeName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException ex) {
+ nodeName = "client-" + RandomStringUtils.randomAlphabetic(8);
+ log.warn("Couldn't get hostname to use as ES node name, using " + nodeName);
+ }
}
Settings settings = ImmutableSettings.settingsBuilder()
@@ -149,6 +163,7 @@ public class EsProvider {
.put("client.transport.ping_timeout", 2000) // milliseconds
.put("client.transport.nodes_sampler_interval", 100)
.put("network.tcp.blocking", true)
+ .put("node.name", nodeName )
.build();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
index e3d6560..81d0c1c 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
@@ -94,7 +94,7 @@ public class IndexRebuild extends ToolBase {
logger.info( "Starting index rebuild" );
- emf.rebuildInternalIndexes();
+ emf.rebuildInternalIndexes( null );
emf.refreshIndex();
/**
@@ -106,7 +106,7 @@ public class IndexRebuild extends ToolBase {
Set<String> collections = getCollections( line, appId );
for ( String collection : collections ) {
- emf.rebuildCollectionIndex(appId, collection);
+ emf.rebuildCollectionIndex( appId, collection, null );
emf.refreshIndex();
}
}
@@ -135,7 +135,7 @@ public class IndexRebuild extends ToolBase {
System.out.println( "Printing all apps" );
for ( Entry<String, UUID> entry : ids.entrySet() ) {
- System.out.println( entry.getKey() );
+ System.out.println( entry.getKey() + " appid=" + entry.getValue() );
}
return ids.values();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6a6171e8/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index e9c23e5..b993313 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
-log4j.rootLogger=INFO,stdout
+log4j.rootLogger=ERROR,stdout
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -26,7 +26,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
-log4j.category.org.apache.usergrid.tools=TRACE, stdout
+log4j.category.org.apache.usergrid.tools=TRACE
+log4j.category.org.apache.usergrid=ERROR
log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN, stdout
log4j.logger.org.apache.usergrid.persistence.cassandra.BATCH=WARN, stdout
@@ -38,8 +39,15 @@ log4j.logger.me.prettyprint.cassandra.hector.TimingLogger=WARN, stdout
log4j.logger.org.apache.usergrid.rest.security.AllowAjaxFilter=WARN, stdout
log4j.logger.me.prettyprint.hector.api.beans.AbstractComposite=ERROR, stdout
#log4j.logger.org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl=DEBUG, stdout
-
-log4j.logger.org.apache.usergrid.persistence.hector.CountingMutator=INFO, stdout
-
+#log4j.logger.org.apache.usergrid.persistence.hector.CountingMutator=INFO, stdout
#log4j.logger.org.apache.cassandra.service.StorageProxy=DEBUG, stdout
+#log4j.logger.org.apache.usergrid.corepersistence=INFO
+#log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO
+log4j.logger.org.apache.usergrid.corepersistence.CpEntityManagerFactory=DEBUG
+#log4j.logger.org.apache.usergrid.corepersistence.CpEntityManager=DEBUG
+#log4j.logger.org.apache.usergrid.corepersistence.CpRelationManager=DEBUG
+
+#log4j.logger.org.apache.usergrid.persistence.collection=INFO
+#log4j.logger.org.apache.usergrid.persistence.index=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
[13/13] git commit: Updated batching calls
Posted by to...@apache.org.
Updated batching calls
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c051fdae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c051fdae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c051fdae
Branch: refs/heads/esbatching
Commit: c051fdaecf3c7e86ce6188a7f56dd2310e1d1ec1
Parents: ae9974d
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 23:44:03 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 23:44:03 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 55 ++++++++++++--------
.../corepersistence/CpRelationManager.java | 21 +++++---
.../index/impl/EsEntityIndexImpl.java | 7 +--
3 files changed, 51 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c051fdae/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 5ce871b..8b8dc8d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -626,7 +626,7 @@ public class CpEntityManager implements EntityManager {
logger.debug( "Deleting indexes of all {} collections owning the entity",
owners.keySet().size() );
- final EntityIndex ei = managerCache.getEntityIndex( appScope );
+ final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
final EntityIndexBatch batch = ei.createBatch();
@@ -1054,7 +1054,7 @@ public class CpEntityManager implements EntityManager {
getCollectionScopeNameFromEntityType( entityRef.getType()) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
- EntityIndex ei = managerCache.getEntityIndex( appScope );
+ EntityIndex ei = managerCache.getEntityIndex( applicationScope );
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
@@ -2611,12 +2611,12 @@ public class CpEntityManager implements EntityManager {
}
// Index CP entity into default collection scope
- IndexScope defaultIndexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
- EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope );
- ei.index( cpEntity );
+// IndexScope defaultIndexScope = new IndexScopeImpl(
+// applicationScope.getApplication(),
+// applicationScope.getApplication(),
+// CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
+// EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+// ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
// reflect changes in the legacy Entity
entity.setUuid( cpEntity.getId().getUuid() );
@@ -3033,7 +3033,7 @@ public class CpEntityManager implements EntityManager {
collName, collectionEntity.getId().getType(), collectionEntity.getId().getUuid(),
memberEntity.getId().getType(), memberEntity.getId().getUuid() });
- indexEntityIntoCollection( collectionEntity, memberEntity, collName );
+ indexEntityIntoCollection( collectionEntity, memberEntity, collName);
CollectionInfo collection = getDefaultSchema()
.getCollection( memberEntity.getId().getType(), collName);
@@ -3053,58 +3053,71 @@ public class CpEntityManager implements EntityManager {
org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
org.apache.usergrid.persistence.model.entity.Entity targetEntity,
String targetEntityType,
- String connType ) {
+ String connType) {
logger.debug("Indexing into connection {} source {}:{} target {}:{}", new Object[] {
connType, sourceEntity.getId().getType(), sourceEntity.getId().getUuid(),
targetEntity.getId().getType(), targetEntity.getId().getUuid() });
+
+ final EntityIndex ei = getManagerCache().getEntityIndex( applicationScope );
+ final EntityIndexBatch batch = ei.createBatch();
+
+
+
+
// Index the new connection in app|source|type context
IndexScope indexScope = new IndexScopeImpl(
applicationScope.getApplication(),
sourceEntity.getId(),
CpEntityManager.getConnectionScopeName(targetEntityType, connType));
- EntityIndex ei = managerCache.getEntityIndex(indexScope);
- ei.index(targetEntity);
+ batch.index(indexScope, targetEntity);
// Index the new connection in app|scope|all-types context
IndexScope allTypesIndexScope = new IndexScopeImpl(
applicationScope.getApplication(),
sourceEntity.getId(),
ALL_TYPES);
- EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
- aei.index(targetEntity);
+
+ batch.index(allTypesIndexScope, targetEntity);
+
+ batch.execute();
}
void indexEntityIntoCollection(
org.apache.usergrid.persistence.model.entity.Entity collectionEntity,
org.apache.usergrid.persistence.model.entity.Entity memberEntity,
- String collName ) {
+ String collName) {
+
+ final EntityIndex ei = getManagerCache().getEntityIndex( applicationScope );
+ final EntityIndexBatch batch = ei.createBatch();
// index member into entity collection | type scope
IndexScope collectionIndexScope = new IndexScopeImpl(
applicationScope.getApplication(),
collectionEntity.getId(),
CpEntityManager.getCollectionScopeNameFromCollectionName(collName));
- EntityIndex collectionIndex = managerCache.getEntityIndex(collectionIndexScope);
- collectionIndex.index(memberEntity);
+
+ batch.index(collectionIndexScope, memberEntity);
// index member into entity | all-types scope
IndexScope entityAllTypesScope = new IndexScopeImpl(
applicationScope.getApplication(),
collectionEntity.getId(),
ALL_TYPES);
- EntityIndex entityAllCollectionIndex = managerCache.getEntityIndex(entityAllTypesScope);
- entityAllCollectionIndex.index(memberEntity);
+
+ batch.index(entityAllTypesScope, memberEntity);
// index member into application | all-types scope
IndexScope appAllTypesScope = new IndexScopeImpl(
applicationScope.getApplication(),
applicationScope.getApplication(),
ALL_TYPES);
- EntityIndex allCollectionIndex = managerCache.getEntityIndex(appAllTypesScope);
- allCollectionIndex.index(memberEntity);
+
+ batch.index(appAllTypesScope, memberEntity);
+
+ batch.execute();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c051fdae/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 0073eb1..14f9359 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -16,6 +16,8 @@
package org.apache.usergrid.corepersistence;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.CpEntityManager.*;
import com.yammer.metrics.annotation.Metered;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
@@ -30,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,6 +143,11 @@ import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
import rx.Observable;
@@ -917,7 +925,7 @@ public class CpRelationManager implements RelationManager {
while ( !satisfied && queryCount++ < maxQueries ) {
- CandidateResults crs = ei.search( query );
+ CandidateResults crs = ei.search( indexScope, query );
if ( results == null ) {
results = buildResults( query, crs, collName );
@@ -1009,23 +1017,24 @@ public class CpRelationManager implements RelationManager {
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge(edge).toBlockingObservable().last();
+ EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+ EntityIndexBatch batch = ei.createBatch();
+
// Index the new connection in app|source|type context
IndexScope indexScope = new IndexScopeImpl(
applicationScope.getApplication(),
cpHeadEntity.getId(),
CpEntityManager.getConnectionScopeName( connectedEntityRef.getType(), connectionType ));
- EntityIndex ei = managerCache.getEntityIndex(indexScope);
- ei.index( targetEntity );
+
+ batch.index(indexScope, targetEntity );
// Index the new connection in app|scope|all-types context
IndexScope allTypesIndexScope = new IndexScopeImpl(
applicationScope.getApplication(),
cpHeadEntity.getId(),
ALL_TYPES);
- EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
- aei.index( targetEntity );
+ batch.index(allTypesIndexScope, targetEntity );
- batch.index( allTypesIndexScope, targetEntity );
batch.execute();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c051fdae/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 7851afe..264eb1d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -26,6 +26,7 @@ import java.util.TreeSet;
import java.util.UUID;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
@@ -308,9 +309,5 @@ public class EsEntityIndexImpl implements EntityIndex {
}
- @Override
- public void refresh() {
- client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
- log.debug( "Refreshed index: " + indexName );
- }
+
}
[02/13] git commit: Beginning of some new migration methods.
Posted by to...@apache.org.
Beginning of some new migration methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/58fc540a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/58fc540a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/58fc540a
Branch: refs/heads/esbatching
Commit: 58fc540ad28476b3b4c166a77ee9fe7cbd3412a7
Parents: 6a6171e
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Sep 29 17:26:56 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Sep 29 17:26:56 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 186 ++++++++++++++++++-
1 file changed, 177 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/58fc540a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 491a47b..5ca9536 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -91,7 +91,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public static final Class<DynamicEntity> APPLICATION_ENTITY_CLASS = DynamicEntity.class;
// The System Application where we store app and org metadata
- public static final String SYSTEM_APPS_UUID = "b6768a08-b5d5-11e3-a495-10ddb1de66c3";
+ public static final UUID SYSTEM_APP_ID =
+ UUID.fromString("b6768a08-b5d5-11e3-a495-10ddb1de66c3");
public static final UUID MANAGEMENT_APPLICATION_ID =
UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c8");
@@ -99,26 +100,41 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public static final UUID DEFAULT_APPLICATION_ID =
UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
- // Three types of things we store in System Application
+
+ @Deprecated // use system app for these in future
public static final String SYSTEM_APPS_TYPE = "zzzappszzz";
+
+ @Deprecated
public static final String SYSTEM_ORGS_TYPE = "zzzorgszzz";
+
+ @Deprecated
public static final String SYSTEM_PROPS_TYPE = "zzzpropszzz";
+ @Deprecated // use system app for these in future
private static final Id systemAppId =
- new SimpleId( UUID.fromString(SYSTEM_APPS_UUID), SYSTEM_APPS_TYPE );
-
+ new SimpleId( SYSTEM_APP_ID, SYSTEM_APPS_TYPE );
+
+ @Deprecated
public static final CollectionScope SYSTEM_APPS_SCOPE =
new CollectionScopeImpl( systemAppId, systemAppId, SYSTEM_APPS_TYPE );
+
+ @Deprecated
public static final IndexScope SYSTEM_APPS_INDEX_SCOPE =
new IndexScopeImpl( systemAppId, systemAppId, SYSTEM_APPS_TYPE);
+ @Deprecated
public static final CollectionScope SYSTEM_ORGS_SCOPE =
new CollectionScopeImpl( systemAppId, systemAppId, SYSTEM_ORGS_TYPE);
+
+ @Deprecated
public static final IndexScope SYSTEM_ORGS_INDEX_SCOPE =
new IndexScopeImpl( systemAppId, systemAppId, SYSTEM_ORGS_TYPE);
+ @Deprecated
public static final CollectionScope SYSTEM_PROPS_SCOPE =
new CollectionScopeImpl( systemAppId, systemAppId, SYSTEM_PROPS_TYPE);
+
+ @Deprecated
public static final IndexScope SYSTEM_PROPS_INDEX_SCOPE =
new IndexScopeImpl( systemAppId, systemAppId, SYSTEM_PROPS_TYPE);
@@ -142,16 +158,29 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private static final int REBUILD_PAGE_SIZE = 100;
- public CpEntityManagerFactory(
- CassandraService cass, CounterUtils counterUtils, boolean skipAggregateCounters ) {
+ public CpEntityManagerFactory(
+ CassandraService cass, CounterUtils counterUtils, boolean skipAggregateCounters) {
this.cass = cass;
this.counterUtils = counterUtils;
this.skipAggregateCounters = skipAggregateCounters;
- if ( skipAggregateCounters ) {
- logger.warn( "NOTE: Counters have been disabled by configuration..." );
+ if (skipAggregateCounters) {
+ logger.warn("NOTE: Counters have been disabled by configuration...");
+ }
+
+ // if system app does have apps, orgs and props then populate it
+ try {
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
+ Results orgs = em.searchCollection(em.getApplicationRef(), "organizations", null);
+ if (orgs.isEmpty()) {
+ populateSystemAppsFromEs();
+ populateSystemOrgsFromEs();
+ populateSystemPropsFromEs();
+ }
+
+ } catch (Exception ex) {
+ throw new RuntimeException("Fatal error migrating data", ex);
}
- logger.debug("Created a new CpEntityManagerFactory");
}
@@ -726,4 +755,143 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
em.flushManagerCaches();
}
}
+
+
+ private void populateSystemOrgsFromEs() throws Exception {
+
+ logger.info("Migrating system orgs");
+
+ EntityCollectionManager ecm = getManagerCache()
+ .getEntityCollectionManager(SYSTEM_ORGS_SCOPE);
+ EntityIndex ei = getManagerCache()
+ .getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
+
+ EntityManager systemAppEm = getEntityManager(SYSTEM_APP_ID);
+
+ String cursor = null;
+ boolean done = false;
+
+ while ( !done ) {
+
+ Query q = Query.fromQL("select *");
+ q.setCursor( cursor );
+
+ CandidateResults results = ei.search( q );
+ cursor = results.getCursor();
+
+ Iterator<CandidateResult> iter = results.iterator();
+ while ( iter.hasNext() ) {
+
+ CandidateResult cr = iter.next();
+ Entity e = ecm.load( cr.getId() ).toBlockingObservable().last();
+
+ if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
+ logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+ new Object[] { cr.getId().getUuid(), cr.getId().getType(),
+ cr.getVersion(), e.getVersion()});
+ continue;
+ }
+
+ Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
+ systemAppEm.create("organization", entityMap );
+ }
+
+ if ( cursor == null ) {
+ done = true;
+ }
+ }
+ }
+
+
+ private void populateSystemAppsFromEs() throws Exception {
+
+ logger.info("Migrating system apps");
+
+ EntityCollectionManager ecm = getManagerCache()
+ .getEntityCollectionManager(SYSTEM_APPS_SCOPE );
+ EntityIndex ei = getManagerCache()
+ .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
+
+ EntityManager systemAppEm = getEntityManager(SYSTEM_APP_ID);
+
+ String cursor = null;
+ boolean done = false;
+
+ while ( !done ) {
+
+ Query q = Query.fromQL("select *");
+ q.setCursor( cursor );
+
+ CandidateResults results = ei.search( q );
+ cursor = results.getCursor();
+
+ Iterator<CandidateResult> iter = results.iterator();
+ while ( iter.hasNext() ) {
+
+ CandidateResult cr = iter.next();
+ Entity e = ecm.load( cr.getId() ).toBlockingObservable().last();
+
+ if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
+ logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+ new Object[] { cr.getId().getUuid(), cr.getId().getType(),
+ cr.getVersion(), e.getVersion()});
+ continue;
+ }
+
+ Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
+ systemAppEm.create("application", entityMap );
+ }
+
+ if ( cursor == null ) {
+ done = true;
+ }
+ }
+ }
+
+
+ private void populateSystemPropsFromEs() throws Exception {
+
+ logger.info("Migrating system props");
+
+ EntityCollectionManager ecm = getManagerCache()
+ .getEntityCollectionManager(SYSTEM_PROPS_SCOPE );
+ EntityIndex ei = getManagerCache()
+ .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
+
+ EntityManager systemAppEm = getEntityManager(SYSTEM_APP_ID);
+
+ String cursor = null;
+ boolean done = false;
+
+ while ( !done ) {
+
+ Query q = Query.fromQL("select *");
+ q.setCursor( cursor );
+
+ CandidateResults results = ei.search( q );
+ cursor = results.getCursor();
+
+ Iterator<CandidateResult> iter = results.iterator();
+ while ( iter.hasNext() ) {
+
+ CandidateResult cr = iter.next();
+ Entity e = ecm.load( cr.getId() ).toBlockingObservable().last();
+
+ if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
+ logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+ new Object[] { cr.getId().getUuid(), cr.getId().getType(),
+ cr.getVersion(), e.getVersion()});
+ continue;
+ }
+
+ Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
+ systemAppEm.create("property", entityMap );
+ }
+
+ if ( cursor == null ) {
+ done = true;
+ }
+ }
+ }
+
}
[03/13] Changes to 1) ensure that the CpEntityManager's ElasticSearch
can be rebuilt entirely from data stored in Cassandra,
2) provide support Index Rebuild in two-dot-o and the beginnings of an Index
Rebuild test.
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 0345e23..4c60e82 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -16,9 +16,11 @@
package org.apache.usergrid.corepersistence;
+import com.yammer.metrics.annotation.Metered;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import static java.util.Arrays.asList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -27,12 +29,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-
-import org.apache.usergrid.utils.UUIDUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static org.apache.usergrid.corepersistence.CpEntityManager.getEdgeTypeFromCollectionName;
+import static org.apache.usergrid.corepersistence.CpEntityManager.getEdgeTypeFromConnectionType;
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.Entity;
@@ -45,12 +48,43 @@ import org.apache.usergrid.persistence.RelationManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.RoleRef;
import org.apache.usergrid.persistence.Schema;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
+import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
import org.apache.usergrid.persistence.cassandra.CassandraService;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
import org.apache.usergrid.persistence.cassandra.IndexUpdate;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
import org.apache.usergrid.persistence.cassandra.QueryProcessorImpl;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
@@ -96,56 +130,18 @@ import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
import org.apache.usergrid.persistence.schema.CollectionInfo;
-import org.apache.usergrid.utils.IndexUtils;
-import org.apache.usergrid.utils.MapUtils;
-
-import com.yammer.metrics.annotation.Metered;
-
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.mutation.Mutator;
-import rx.Observable;
-
-import static java.util.Arrays.asList;
-
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
-import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
-import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
-import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.Serializers.be;
import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
+import org.apache.usergrid.utils.IndexUtils;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
+import org.apache.usergrid.utils.MapUtils;
import static org.apache.usergrid.utils.MapUtils.addMapSet;
+import org.apache.usergrid.utils.UUIDUtils;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+import rx.Observable;
/**
@@ -155,11 +151,11 @@ public class CpRelationManager implements RelationManager {
private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
- public static final String ALL_TYPES = "zzzalltypeszzz";
+ static final String ALL_TYPES = "zzzalltypeszzz";
- private static final String EDGE_COLL_SUFFIX = "zzzcollzzz";
+ static final String EDGE_COLL_SUFFIX = "zzzcollzzz";
- private static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
+ static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
private CpEntityManagerFactory emf;
@@ -239,59 +235,6 @@ public class CpRelationManager implements RelationManager {
}
- static String getEdgeTypeFromConnectionType( String connectionType, String targetEntityType ) {
-
- if ( connectionType != null && targetEntityType != null ) {
- String csn = connectionType + "|" + targetEntityType + "|" + EDGE_CONN_SUFFIX;
- return csn;
- }
-
- if ( connectionType != null ) {
- // no suffix, this must be a search
- String csn = connectionType;
- return csn;
- }
-
- return null;
- }
-
-
- static String getEdgeTypeFromCollectionName( String collectionName, String targetEntityType ) {
-
- if ( collectionName != null && targetEntityType != null ) {
- String csn = collectionName + "|" + targetEntityType + "|" + EDGE_COLL_SUFFIX;
- return csn;
- }
-
- if ( collectionName != null ) {
- // no suffix, this must be a search
- String csn = collectionName;
- return csn;
- }
-
- return null;
- }
-
-
- static boolean isCollectionEdgeType( String type ) {
- return type.endsWith( EDGE_COLL_SUFFIX );
- }
-
- static boolean isConnectionEdgeType( String type ) {
- return type.endsWith( EDGE_CONN_SUFFIX );
- }
-
- public String getConnectionName( String edgeType ) {
- String[] parts = edgeType.split("\\|");
- return parts[0];
- }
-
- public String getCollectionName( String edgeType ) {
- String[] parts = edgeType.split("\\|");
- return parts[0];
- }
-
-
@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
final Set<String> indexes = new HashSet<String>();
@@ -387,10 +330,10 @@ public class CpRelationManager implements RelationManager {
edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
String name = null;
- if ( isConnectionEdgeType( edge.getType() )) {
- name = getConnectionName( edge.getType() );
+ if ( CpEntityManager.isConnectionEdgeType( edge.getType() )) {
+ name = CpEntityManager.getConnectionType( edge.getType() );
} else {
- name = getCollectionName( edge.getType() );
+ name = CpEntityManager.getCollectionName( edge.getType() );
}
addMapSet( results, eref, name );
}
@@ -445,9 +388,9 @@ public class CpRelationManager implements RelationManager {
// reindex the entity in the source entity's collection or connection index
IndexScope indexScope;
- if ( isCollectionEdgeType( edge.getType() )) {
+ if ( CpEntityManager.isCollectionEdgeType( edge.getType() )) {
- String collName = getCollectionName( edge.getType() );
+ String collName = CpEntityManager.getCollectionName( edge.getType() );
indexScope = new IndexScopeImpl(
applicationScope.getApplication(),
new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
@@ -455,7 +398,7 @@ public class CpRelationManager implements RelationManager {
} else {
- String connName = getCollectionName( edge.getType() );
+ String connName = CpEntityManager.getCollectionName( edge.getType() );
indexScope = new IndexScopeImpl(
applicationScope.getApplication(),
new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
@@ -545,7 +488,7 @@ public class CpRelationManager implements RelationManager {
Observable<Edge> edgesToTarget = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
targetId,
- CpRelationManager.getEdgeTypeFromConnectionType( connectionType, target.getType() ),
+ CpEntityManager.getEdgeTypeFromConnectionType( connectionType, target.getType() ),
System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,
null)); // last
@@ -569,7 +512,7 @@ public class CpRelationManager implements RelationManager {
Observable<Edge> edgesFromSource = gm.loadEdgesFromSource(new SimpleSearchByEdgeType(
sourceId,
- CpRelationManager.getEdgeTypeFromConnectionType( connectionType, null ),
+ CpEntityManager.getEdgeTypeFromConnectionType( connectionType, null ),
System.currentTimeMillis(),SearchByEdgeType.Order.DESCENDING,
null)); // last
@@ -587,12 +530,12 @@ public class CpRelationManager implements RelationManager {
GraphManager gm = managerCache.getGraphManager(applicationScope);
Observable<String> str = gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( cpHeadEntity.getId(),null , null ));
+ new SimpleSearchEdgeType( cpHeadEntity.getId(), null , null ));
Iterator<String> iter = str.toBlockingObservable().getIterator();
while ( iter.hasNext() ) {
String edgeType = iter.next();
- indexes.add( getCollectionName( edgeType ) );
+ indexes.add( CpEntityManager.getCollectionName( edgeType ) );
}
return indexes;
@@ -670,7 +613,8 @@ public class CpRelationManager implements RelationManager {
}
if ( logger.isDebugEnabled() ) {
- logger.debug("Loaded member entity {}:{} from scope\n app {}\n owner {}\n name {} data {}",
+ logger.debug("Loaded member entity {}:{} from scope\n app {}\n "
+ + "owner {}\n name {} data {}",
new Object[] {
itemRef.getType(),
itemRef.getUuid(),
@@ -683,12 +627,10 @@ public class CpRelationManager implements RelationManager {
String edgeType = getEdgeTypeFromCollectionName( collName, memberEntity.getId().getType() );
- logger.debug("addToCollection(): Creating edge type {} from {}:{} to {}:{}",
- new Object[] {
- edgeType,
- headEntity.getType(), headEntity.getUuid(),
- itemRef.getType(), itemRef.getUuid() });
- UUID timeStampUuid = memberEntity.getId().getUuid() != null && UUIDUtils.isTimeBased( memberEntity.getId().getUuid()) ? memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
+ UUID timeStampUuid = memberEntity.getId().getUuid() != null
+ && UUIDUtils.isTimeBased( memberEntity.getId().getUuid())
+ ? memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
+
long uuidHash = UUIDUtils.getUUIDLong(timeStampUuid);
// create graph edge connection from head entity to member entity
@@ -700,37 +642,20 @@ public class CpRelationManager implements RelationManager {
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge(edge).toBlockingObservable().last();
- // index member into entity collection | type scope
- IndexScope collectionIndexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
- cpHeadEntity.getId(),
- CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
- EntityIndex collectionIndex = managerCache.getEntityIndex(collectionIndexScope);
- collectionIndex.index( memberEntity );
-
- // index member into entity | all-types scope
- IndexScope entityAllTypesScope = new IndexScopeImpl(
- applicationScope.getApplication(),
- cpHeadEntity.getId(),
- ALL_TYPES);
- EntityIndex entityAllCollectionIndex = managerCache.getEntityIndex(entityAllTypesScope);
- entityAllCollectionIndex.index( memberEntity );
+ logger.debug("\n\nWrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}\n\n", new Object[] {
+ edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+ memberEntity.getId().getType(), memberEntity.getId().getUuid(),
+ applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});
- // index member into application | all-types scope
- IndexScope appAllTypesScope = new IndexScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- ALL_TYPES);
- EntityIndex allCollectionIndex = managerCache.getEntityIndex(appAllTypesScope);
- allCollectionIndex.index( memberEntity );
+ ((CpEntityManager)em).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
- logger.debug("Added entity {}:{} to collection {}", new String[] {
+ logger.debug("Added entity {}:{} to collection {}", new Object[] {
itemRef.getUuid().toString(), itemRef.getType(), collName });
- logger.debug("With head entity scope is {}:{}:{}", new String[] {
- headEntityScope.getApplication().toString(),
- headEntityScope.getOwner().toString(),
- headEntityScope.getName()});
+// logger.debug("With head entity scope is {}:{}:{}", new Object[] {
+// headEntityScope.getApplication().toString(),
+// headEntityScope.getOwner().toString(),
+// headEntityScope.getName()});
if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
getRelationManager( itemEntity )
@@ -969,53 +894,49 @@ public class CpRelationManager implements RelationManager {
query.setEntityType( collection.getType() );
query = adjustQuery( query );
- CandidateResults crs = ei.search( query );
+ // Because of possible stale entities, which are filtered out by buildResults(),
+ // we loop until the we've got enough results to satisfy the query limit.
- return buildResults( query, crs, collName );
-
-// // Because of possible stale entities, which are filtered out by buildResults(),
-// // we loop until the we've got enough results to satisfy the query limit.
-//
-// int maxQueries = 10; // max re-queries to satisfy query limit
-//
-// Results results = null;
-// int queryCount = 0;
-// int originalLimit = query.getLimit();
-// boolean satisfied = false;
-//
-// while ( !satisfied && queryCount++ < maxQueries ) {
-//
-// CandidateResults crs = ei.search( query );
-//
-// if ( results == null ) {
-// results = buildResults( query, crs, collName );
-//
-// } else {
-// Results newResults = buildResults( query, crs, collName );
-// results.merge( newResults );
-// }
-//
-// if ( crs.isEmpty() ) { // no more results
-// satisfied = true;
-//
-// } else if ( results.size() == query.getLimit() ) { // got what we need
-// satisfied = true;
-//
-// } else if ( crs.hasCursor() ) {
-// satisfied = false;
-//
-// // need to query for more
-// // ask for just what we need to satisfy, don't want to exceed limit
-// query.setCursor( results.getCursor() );
-// query.setLimit( originalLimit - results.size() );
-//
-// logger.warn("Satisfy query limit {}, new limit {} query count {}", new Object[] {
-// originalLimit, query.getLimit(), queryCount
-// });
-// }
-// }
-//
-// return results;
+ int maxQueries = 10; // max re-queries to satisfy query limit
+
+ Results results = null;
+ int queryCount = 0;
+ int originalLimit = query.getLimit();
+ boolean satisfied = false;
+
+ while ( !satisfied && queryCount++ < maxQueries ) {
+
+ CandidateResults crs = ei.search( query );
+
+ if ( results == null ) {
+ results = buildResults( query, crs, collName );
+
+ } else {
+ Results newResults = buildResults( query, crs, collName );
+ results.merge( newResults );
+ }
+
+ if ( crs.isEmpty() ) { // no more results
+ satisfied = true;
+
+ } else if ( results.size() == query.getLimit() ) { // got what we need
+ satisfied = true;
+
+ } else if ( crs.hasCursor() ) {
+ satisfied = false;
+
+ // need to query for more
+ // ask for just what we need to satisfy, don't want to exceed limit
+ query.setCursor( results.getCursor() );
+ query.setLimit( originalLimit - results.size() );
+
+ logger.warn("Satisfy query limit {}, new limit {} query count {}", new Object[] {
+ originalLimit, query.getLimit(), queryCount
+ });
+ }
+ }
+
+ return results;
}
@@ -1065,20 +986,9 @@ public class CpRelationManager implements RelationManager {
new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ))
.toBlockingObservable().last();
- String edgeType = CpRelationManager
+ String edgeType = CpEntityManager
.getEdgeTypeFromConnectionType( connectionType, connectedEntityRef.getType() );
- if ( logger.isDebugEnabled() ) {
- logger.debug("createConnection(): "
- + "Creating edge type {} \n from {}:{}\n to {}:{}\n in scope {}",
- new Object[] {
- edgeType,
- headEntity.getType(), headEntity.getUuid(),
- connectedEntityRef.getType(), connectedEntityRef.getUuid(),
- applicationScope.getApplication()
- });
- }
-
// create graph edge connection from head entity to member entity
Edge edge = new SimpleEdge(
cpHeadEntity.getId(),
@@ -1088,21 +998,13 @@ public class CpRelationManager implements RelationManager {
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge(edge).toBlockingObservable().last();
- // Index the new connection in app|source|type context
- IndexScope indexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
- cpHeadEntity.getId(),
- CpEntityManager.getConnectionScopeName( connectedEntityRef.getType(), connectionType ));
- EntityIndex ei = managerCache.getEntityIndex(indexScope);
- ei.index( targetEntity );
+ logger.debug("\n\nWrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}\n\n", new Object[] {
+ edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+ targetEntity.getId().getType(), targetEntity.getId().getUuid(),
+ applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});
- // Index the new connection in app|scope|all-types context
- IndexScope allTypesIndexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
- cpHeadEntity.getId(),
- ALL_TYPES);
- EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
- aei.index( targetEntity );
+ ((CpEntityManager)em).indexEntityIntoConnection(
+ cpHeadEntity, targetEntity, connectedEntityRef.getType(), connectionType );
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = createMutator( ko, be );
@@ -1338,7 +1240,9 @@ public class CpRelationManager implements RelationManager {
@Override
public Set<String> getConnectionTypes(boolean filterConnection) throws Exception {
- Set<String> connections = cast( em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
+ Set<String> connections = cast(
+ em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
+
if ( connections == null ) {
return null;
}
@@ -1435,7 +1339,7 @@ public class CpRelationManager implements RelationManager {
// looking for edges to the head entity
String edgeType =
- CpRelationManager.getEdgeTypeFromConnectionType( connType, headEntity.getType() );
+ CpEntityManager.getEdgeTypeFromConnectionType( connType, headEntity.getType() );
Map<EntityRef, Set<String>> containers =
getContainers( count, edgeType, fromEntityType );
@@ -2522,4 +2426,6 @@ public class CpRelationManager implements RelationManager {
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
index 2a5da5e..9063775 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
@@ -17,32 +17,21 @@
package org.apache.usergrid.corepersistence;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.mq.cassandra.QueuesCF;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.cassandra.ApplicationCF;
-import org.apache.usergrid.persistence.cassandra.CassandraService;
-import org.apache.usergrid.persistence.cassandra.Setup;
-import org.apache.usergrid.persistence.core.migration.MigrationException;
-import org.apache.usergrid.persistence.core.migration.MigrationManager;
-import org.apache.usergrid.persistence.entities.Application;
-
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.netflix.config.ConfigurationManager;
-
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.Level;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.ddl.ComparatorType;
-
import static me.prettyprint.hector.api.factory.HFactory.createColumnFamilyDefinition;
import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.mq.cassandra.QueuesCF;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.cassandra.ApplicationCF;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.getCfDefs;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
import static org.apache.usergrid.persistence.cassandra.CassandraService.APPLICATIONS_CF;
import static org.apache.usergrid.persistence.cassandra.CassandraService.DEFAULT_APPLICATION;
import static org.apache.usergrid.persistence.cassandra.CassandraService.DEFAULT_ORGANIZATION;
@@ -54,6 +43,13 @@ import static org.apache.usergrid.persistence.cassandra.CassandraService.SYSTEM_
import static org.apache.usergrid.persistence.cassandra.CassandraService.TOKENS_CF;
import static org.apache.usergrid.persistence.cassandra.CassandraService.USE_VIRTUAL_KEYSPACES;
import static org.apache.usergrid.persistence.cassandra.CassandraService.keyspaceForApplication;
+import org.apache.usergrid.persistence.cassandra.Setup;
+import org.apache.usergrid.persistence.core.migration.MigrationException;
+import org.apache.usergrid.persistence.core.migration.MigrationManager;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -168,11 +164,19 @@ public class CpSetup implements Setup {
logger.info("Setting up default applications");
- emf.initializeApplication( DEFAULT_ORGANIZATION,
- emf.getDefaultAppId(), DEFAULT_APPLICATION, null );
+ try {
+ emf.initializeApplication( DEFAULT_ORGANIZATION,
+ emf.getDefaultAppId(), DEFAULT_APPLICATION, null );
+ } catch (ApplicationAlreadyExistsException ex) {
+ logger.warn("Application {}/{} already exists", DEFAULT_ORGANIZATION, DEFAULT_APPLICATION);
+ }
- emf.initializeApplication( DEFAULT_ORGANIZATION,
- emf.getManagementAppId(), MANAGEMENT_APPLICATION, null );
+ try {
+ emf.initializeApplication( DEFAULT_ORGANIZATION,
+ emf.getManagementAppId(), MANAGEMENT_APPLICATION, null );
+ } catch (ApplicationAlreadyExistsException ex) {
+ logger.warn("Application {}/{} already exists", DEFAULT_ORGANIZATION, MANAGEMENT_APPLICATION);
+ }
}
@@ -181,12 +185,7 @@ public class CpSetup implements Setup {
return SystemDefaults.managementApp;
}
-
- /** @return statically constructed reference to the default application */
-// public static Application getDefaultApp() {
-// return SystemDefaults.defaultApp;
-// }
-
+
@Override
public void setupSystemKeyspace() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
index 8002199..5897d6c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
@@ -150,8 +150,17 @@ public class HybridEntityManagerFactory implements EntityManagerFactory, Applica
}
@Override
- public void rebuildCollectionIndex(UUID appId, String collectionName, ProgressObserver po)
- throws Exception {
- factory.rebuildCollectionIndex(appId, collectionName, po);
+ public void rebuildAllIndexes(ProgressObserver po) throws Exception {
+ factory.rebuildAllIndexes(po);
+ }
+
+ @Override
+ public void rebuildApplicationIndexes(UUID appId, ProgressObserver po) throws Exception {
+ factory.rebuildApplicationIndexes(appId, po);
+ }
+
+ @Override
+ public void rebuildCollectionIndex(UUID appId, String collection, Object object) {
+ factory.rebuildCollectionIndex(appId, collection, object);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index bf4e47e..d962d2a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -117,17 +117,20 @@ public interface EntityManagerFactory {
public void refreshIndex();
+ public void rebuildAllIndexes( ProgressObserver po ) throws Exception;
+
public void rebuildInternalIndexes( ProgressObserver po ) throws Exception;
- public void rebuildCollectionIndex(
- UUID appId, String collectionName, ProgressObserver po ) throws Exception;
+ public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception;
public void setApplicationContext(ApplicationContext ac);
/** For testing purposes */
public void flushEntityManagerCaches();
+ public void rebuildCollectionIndex(UUID appId, String collection, Object object);
+
public interface ProgressObserver {
- public void onProgress();
+ public void onProgress( EntityRef source, EntityRef target, String edgeType );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 987d39a..1e3b1c1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -28,9 +28,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +64,6 @@ import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.ColumnQuery;
-import me.prettyprint.hector.api.query.CountQuery;
import me.prettyprint.hector.api.query.MultigetSliceQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
@@ -84,7 +80,6 @@ import static org.apache.commons.collections.MapUtils.getIntValue;
import static org.apache.commons.collections.MapUtils.getString;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffers;
import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
index 89d8a07..955e707 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -56,7 +56,6 @@ import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
-import org.apache.usergrid.persistence.Results;
import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
@@ -68,8 +67,6 @@ import static org.apache.usergrid.persistence.cassandra.CassandraService.PROPERT
import static org.apache.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT;
import static org.apache.usergrid.utils.ConversionUtils.uuid;
import static org.apache.usergrid.persistence.cassandra.Serializers.*;
-import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
-import org.apache.usergrid.persistence.index.query.Query;
/**
@@ -423,49 +420,22 @@ public class EntityManagerFactoryImpl implements EntityManagerFactory, Applicati
}
@Override
- public void rebuildCollectionIndex(UUID appId, String collectionName, ProgressObserver po )
- throws Exception {
-
- logger.info( "Reindexing collection: {} for app id: {}", collectionName, appId );
-
- EntityManager em = getEntityManager( appId );
- Application app = em.getApplication();
-
- // search for all orgs
-
- Query query = new Query();
- query.setLimit(REBUILD_PAGE_SIZE );
- Results r = null;
-
- do {
-
- r = em.searchCollection( app, collectionName, query );
-
- for ( org.apache.usergrid.persistence.Entity entity : r.getEntities() ) {
- logger.info( "Updating entity type: {} with id: {} for app id: {}", new Object[] {
- entity.getType(), entity.getUuid(), appId
- } );
-
- try {
- em.update( entity );
- }
- catch ( DuplicateUniquePropertyExistsException dupee ) {
- logger.error( "duplicate property for type: {} with id: {} for app id: {}. "
- + "Property name: {} , value: {}", new Object[] {
- entity.getType(), entity.getUuid(), appId, dupee.getPropertyName(),
- dupee.getPropertyValue()
- } );
- }
- }
+ public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
+ throw new UnsupportedOperationException("Not supported.");
+ }
- query.setCursor( r.getCursor() );
- }
- while ( r != null && r.size() == REBUILD_PAGE_SIZE );
+ @Override
+ public void rebuildAllIndexes(ProgressObserver po) throws Exception {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+ @Override
+ public void rebuildApplicationIndexes(UUID appId, ProgressObserver po) throws Exception {
+ throw new UnsupportedOperationException("Not supported.");
}
@Override
- public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
- // no op
+ public void rebuildCollectionIndex(UUID appId, String collection, Object object) {
+ throw new UnsupportedOperationException("Not supported.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 35d1d50..7af66d3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -17,9 +17,6 @@
package org.apache.usergrid.persistence;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@@ -37,7 +34,11 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.usergrid.cassandra.Concurrent;
+import static org.junit.Assert.fail;
//@RunWith(JukitoRunner.class)
@@ -47,21 +48,21 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
private static final MetricRegistry registry = new MetricRegistry();
+ private Slf4jReporter reporter;
private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
- private static final long writeDelayMs = 7;
- private static final long readDelayMs = 7;
+ private static final long writeDelayMs = 9;
+ //private static final long readDelayMs = 7;
@Rule
public Application app = new CoreApplication( setup );
- private Slf4jReporter reporter;
-
@Before
public void startReporting() {
+ logger.debug("Starting metrics reporting");
reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
.convertRatesTo( TimeUnit.SECONDS )
.convertDurationsTo( TimeUnit.MILLISECONDS ).build();
@@ -72,13 +73,15 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
@After
public void printReport() {
+
+ logger.debug("Printing metrics report");
reporter.report();
reporter.stop();
}
@Test
- public void rebuildIndex() throws Exception {
+ public void rebuildIndex() {
logger.info("Started rebuildIndex()");
@@ -96,34 +99,55 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
while ( System.currentTimeMillis() < stopTime ) {
entityMap.put( "key", i );
- final Entity created = em.create("testType", entityMap );
+ final Entity created;
+ try {
+ created = em.create("testType", entityMap );
+ } catch (Exception ex) {
+ throw new RuntimeException("Error creating entity", ex);
+ }
entityRefs.add( new SimpleEntityRef( created.getType(), created.getUuid() ) );
+ if ( i % 100 == 0 ) {
+ logger.info("Created {} entities", i );
+ }
i++;
- if ( i % 1000 == 0 ) {
- logger.debug("rebuildIndex() Created {} entities",i );
- }
- Thread.sleep( writeDelayMs );
+ try { Thread.sleep( writeDelayMs ); } catch (InterruptedException ignored ) {}
}
- logger.info("rebuildIndex() Created {} entities", i);
+ logger.info("Created {} entities", i);
+
final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
final Meter meter = registry.meter( meterName );
-
+
EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ int counter = 0;
@Override
- public void onProgress() {
+ public void onProgress( EntityRef s, EntityRef t, String etype ) {
+
meter.mark();
+
+ logger.debug("Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
+ s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
+
+ if ( !logger.isDebugEnabled() && counter % 100 == 0 ) {
+ logger.info("Reindexed {} entities", counter );
+ }
+ counter++;
}
};
- setup.getEmf().rebuildInternalIndexes( po );
+ try {
+ setup.getEmf().rebuildAllIndexes( po );
- setup.getEmf().rebuildCollectionIndex( app.getId(), "testTypes", po);
+ registry.remove( meterName );
+ logger.info("Finished rebuildIndex()");
+
+ } catch (Exception ex) {
+ logger.error("Error rebuilding index", ex);
+ fail();
+ }
- registry.remove( meterName );
- logger.info("Finished rebuildIndex()");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
index 81d0c1c..66a5cfb 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
@@ -33,12 +33,13 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
/**
- * This is a utility to load all entities in an application and re-save them, this forces
- * the secondary indexing to be updated.
+ * Index rebuild utility for Usergrid. Can be used to rebuild the index for a specific
+ * application, a specific application's collection or for an entire Usergrid system.
*/
public class IndexRebuild extends ToolBase {
@@ -46,6 +47,8 @@ public class IndexRebuild extends ToolBase {
private static final String COLLECTION_ARG = "col";
+ private static final String ALL_ARG = "all";
+
private static final int PAGE_SIZE = 100;
@@ -71,12 +74,16 @@ public class IndexRebuild extends ToolBase {
Option collOpt = OptionBuilder.withArgName( COLLECTION_ARG ).hasArg().isRequired( false )
.withDescription( "Collection name" ).create( COLLECTION_ARG );
+ Option allOpt = OptionBuilder.withType( Boolean.class ).withArgName( ALL_ARG ).hasArg().isRequired( false )
+ .withDescription( "True to reindex all application" ).create( ALL_ARG );
+
Options options = new Options();
options.addOption( hostOpt );
options.addOption( esHostsOpt );
options.addOption( esClusterOpt );
options.addOption( appOpt );
options.addOption( collOpt );
+ options.addOption( allOpt );
return options;
}
@@ -94,20 +101,40 @@ public class IndexRebuild extends ToolBase {
logger.info( "Starting index rebuild" );
- emf.rebuildInternalIndexes( null );
+ EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ @Override
+ public void onProgress(EntityRef s, EntityRef t, String etype) {
+ logger.info("Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
+ s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
+ }
+ };
+
+ emf.rebuildInternalIndexes( po );
emf.refreshIndex();
- /**
- * Goes through each app id specified
- */
- for ( UUID appId : getAppIds( line ) ) {
+ if ( line.getOptionValue("all") != null && line.getOptionValue("all").equalsIgnoreCase("true") ) {
+ emf.rebuildAllIndexes( po );
+
+ } else if ( line.getOptionValue( APPLICATION_ARG ) != null ) {
+
+ // Goes through each app id specified
+ for ( UUID appId : getAppIds( line ) ) {
+
+ logger.info( "Reindexing for app id: {}", appId );
+ Set<String> collections = getCollections( line, appId );
- logger.info( "Reindexing for app id: {}", appId );
- Set<String> collections = getCollections( line, appId );
+ for ( String collection : collections ) {
+ emf.rebuildCollectionIndex( appId, collection, po );
+ emf.refreshIndex();
+ }
+ }
+
+ } else {
- for ( String collection : collections ) {
- emf.rebuildCollectionIndex( appId, collection, null );
- emf.refreshIndex();
+ Map<String, UUID> ids = emf.getApplications();
+ System.out.println( "Printing all apps" );
+ for ( Entry<String, UUID> entry : ids.entrySet() ) {
+ System.out.println( entry.getKey() + " appid=" + entry.getValue() );
}
}
@@ -117,27 +144,20 @@ public class IndexRebuild extends ToolBase {
/** Get all app id */
private Collection<UUID> getAppIds( CommandLine line ) throws Exception {
+
String appId = line.getOptionValue( APPLICATION_ARG );
Map<String, UUID> ids = emf.getApplications();
if ( appId != null ) {
-
UUID id = UUIDUtils.tryExtractUUID( appId );
-
if ( id == null ) {
logger.debug("Got applications: " + ids );
id = emf.getApplications().get( appId );
}
-
return Collections.singleton( id );
}
- System.out.println( "Printing all apps" );
- for ( Entry<String, UUID> entry : ids.entrySet() ) {
- System.out.println( entry.getKey() + " appid=" + entry.getValue() );
- }
-
return ids.values();
}
[04/13] git commit: Changes to 1) ensure that the CpEntityManager's
ElasticSearch can be rebuilt entirely from data stored in Cassandra,
2) provide support Index Rebuild in two-dot-o and the beginnings of an Index
Rebuild test.
Posted by to...@apache.org.
Changes to 1) ensure that the CpEntityManager's ElasticSearch can be rebuilt entirely from data stored in Cassandra, 2) provide support Index Rebuild in two-dot-o and the beginnings of an Index Rebuild test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1aa04a71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1aa04a71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1aa04a71
Branch: refs/heads/esbatching
Commit: 1aa04a71d73b30f647b6d2d2ea8ce068557da31a
Parents: 58fc540
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 10:47:52 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 10:47:52 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 479 +++++++++++---
.../corepersistence/CpEntityManagerFactory.java | 639 ++++++-------------
.../corepersistence/CpRelationManager.java | 356 ++++-------
.../usergrid/corepersistence/CpSetup.java | 55 +-
.../HybridEntityManagerFactory.java | 15 +-
.../persistence/EntityManagerFactory.java | 9 +-
.../persistence/cassandra/CassandraService.java | 5 -
.../cassandra/EntityManagerFactoryImpl.java | 54 +-
.../PerformanceEntityRebuildIndexTest.java | 64 +-
.../org/apache/usergrid/tools/IndexRebuild.java | 62 +-
10 files changed, 835 insertions(+), 903 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 51e660b..1c54d5f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -16,9 +16,13 @@
package org.apache.usergrid.corepersistence;
+import com.netflix.hystrix.exception.HystrixRuntimeException;
+import com.yammer.metrics.annotation.Metered;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import static java.util.Arrays.asList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -32,11 +36,26 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.CounterRow;
+import me.prettyprint.hector.api.beans.CounterRows;
+import me.prettyprint.hector.api.beans.CounterSlice;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.SliceCounterQuery;
+import static org.apache.commons.lang.StringUtils.capitalize;
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.usergrid.corepersistence.CpRelationManager.ALL_TYPES;
+import static org.apache.usergrid.corepersistence.CpRelationManager.EDGE_COLL_SUFFIX;
+import static org.apache.usergrid.corepersistence.CpRelationManager.EDGE_CONN_SUFFIX;
import org.apache.usergrid.persistence.AggregateCounter;
import org.apache.usergrid.persistence.AggregateCounterSet;
import org.apache.usergrid.persistence.CollectionRef;
@@ -50,14 +69,43 @@ import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.RelationManager;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.Schema;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLENAMES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLETIMES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_SETS;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_MODIFIED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TIMESTAMP;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import org.apache.usergrid.persistence.SimpleEntityRef;
+import static org.apache.usergrid.persistence.SimpleEntityRef.getUuid;
+import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
+import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.TypedEntity;
import org.apache.usergrid.persistence.cassandra.ApplicationCF;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
import org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils;
import org.apache.usergrid.persistence.cassandra.CassandraService;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.ALL_COUNT;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.cassandra.CounterUtils;
import org.apache.usergrid.persistence.cassandra.GeoIndexManager;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
+import static org.apache.usergrid.persistence.cassandra.Serializers.le;
+import static org.apache.usergrid.persistence.cassandra.Serializers.se;
+import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -73,6 +121,11 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
@@ -80,76 +133,27 @@ import org.apache.usergrid.persistence.index.query.CounterResolution;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.Query.Level;
+import static org.apache.usergrid.persistence.index.query.Query.Level.REFS;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.ClassUtils;
-import org.apache.usergrid.utils.CompositeUtils;
-import org.apache.usergrid.utils.StringUtils;
-import org.apache.usergrid.utils.UUIDUtils;
-
-import com.netflix.hystrix.exception.HystrixRuntimeException;
-import com.yammer.metrics.annotation.Metered;
-
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.CounterRow;
-import me.prettyprint.hector.api.beans.CounterRows;
-import me.prettyprint.hector.api.beans.CounterSlice;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.HCounterColumn;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.SliceCounterQuery;
-import rx.Observable;
-
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
-import static java.util.Arrays.asList;
-
-import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static org.apache.commons.lang.StringUtils.capitalize;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static org.apache.usergrid.corepersistence.CpRelationManager.ALL_TYPES;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLENAMES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLETIMES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_SETS;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_MODIFIED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TIMESTAMP;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
-import org.apache.usergrid.persistence.Schema;
-import static org.apache.usergrid.persistence.SimpleEntityRef.getUuid;
-import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
-import org.apache.usergrid.persistence.SimpleRoleRef;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.ALL_COUNT;
-import static org.apache.usergrid.persistence.cassandra.Serializers.be;
-import static org.apache.usergrid.persistence.cassandra.Serializers.le;
-import static org.apache.usergrid.persistence.cassandra.Serializers.se;
-import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
-import static org.apache.usergrid.persistence.index.query.Query.Level.REFS;
import static org.apache.usergrid.utils.ClassUtils.cast;
+import org.apache.usergrid.utils.CompositeUtils;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.utils.ConversionUtils.getLong;
import static org.apache.usergrid.utils.ConversionUtils.object;
import static org.apache.usergrid.utils.ConversionUtils.string;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
+import org.apache.usergrid.utils.StringUtils;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+import rx.Observable;
+import rx.functions.Action1;
@@ -172,7 +176,7 @@ public class CpEntityManager implements EntityManager {
private CpManagerCache managerCache;
- private ApplicationScope appScope;
+ private ApplicationScope applicationScope;
private CassandraService cass;
@@ -191,7 +195,7 @@ public class CpEntityManager implements EntityManager {
this.managerCache = this.emf.getManagerCache();
this.applicationId = applicationId;
- appScope = this.emf.getApplicationScope( applicationId );
+ applicationScope = this.emf.getApplicationScope( applicationId );
this.cass = this.emf.cass;
this.counterUtils = this.emf.counterUtils;
@@ -200,7 +204,7 @@ public class CpEntityManager implements EntityManager {
this.skipAggregateCounters = false;
- appScope = this.emf.getApplicationScope( applicationId );
+ applicationScope = this.emf.getApplicationScope( applicationId );
}
@@ -226,6 +230,66 @@ public class CpEntityManager implements EntityManager {
return csn.toLowerCase();
}
+ static boolean isCollectionEdgeType( String type ) {
+ return type.endsWith( EDGE_COLL_SUFFIX );
+ }
+
+ static boolean isConnectionEdgeType( String type ) {
+ return type.endsWith( EDGE_CONN_SUFFIX );
+ }
+
+ static public String getConnectionType( String edgeType ) {
+ String[] parts = edgeType.split("\\|");
+ return parts[0];
+ }
+
+ static public String getCollectionEntityType( String edgeType ) {
+ String[] parts = edgeType.split("\\|");
+ return parts[1];
+ }
+
+ static public String getCollectionName( String edgeType ) {
+ String[] parts = edgeType.split("\\|");
+ return parts[0];
+ }
+
+ static public String getConnectedEntityType( String edgeType ) {
+ String[] parts = edgeType.split("\\|");
+ return parts[1];
+ }
+
+ static String getEdgeTypeFromConnectionType( String connectionType, String targetEntityType ) {
+
+ if ( connectionType != null && targetEntityType != null ) {
+ String csn = connectionType + "|" + targetEntityType + "|" + EDGE_CONN_SUFFIX;
+ return csn;
+ }
+
+ if ( connectionType != null ) {
+ // no suffix, this must be a search
+ String csn = connectionType;
+ return csn;
+ }
+
+ return null;
+ }
+
+ static String getEdgeTypeFromCollectionName( String collectionName, String targetEntityType ) {
+
+ if ( collectionName != null && targetEntityType != null ) {
+ String csn = collectionName + "|" + targetEntityType + "|" + EDGE_COLL_SUFFIX;
+ return csn;
+ }
+
+ if ( collectionName != null ) {
+ // no suffix, this must be a search
+ String csn = collectionName;
+ return csn;
+ }
+
+ return null;
+ }
+
@Override
public Entity create( String entityType, Map<String, Object> properties ) throws Exception {
@@ -318,7 +382,7 @@ public class CpEntityManager implements EntityManager {
String collectionName = getCollectionScopeNameFromEntityType( entityRef.getType() );
CollectionScope collectionScope = new CollectionScopeImpl(
- appScope.getApplication(), appScope.getApplication(), collectionName );
+ applicationScope.getApplication(), applicationScope.getApplication(), collectionName );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -405,7 +469,7 @@ public class CpEntityManager implements EntityManager {
String collectionName = getCollectionScopeNameFromEntityType( type );
CollectionScope collectionScope = new CollectionScopeImpl(
- appScope.getApplication(), appScope.getApplication(), collectionName );
+ applicationScope.getApplication(), applicationScope.getApplication(), collectionName );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -471,8 +535,8 @@ public class CpEntityManager implements EntityManager {
// first, update entity index in its own collection scope
CollectionScope collectionScope = new CollectionScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
getCollectionScopeNameFromEntityType( entity.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -534,8 +598,8 @@ public class CpEntityManager implements EntityManager {
private Observable deleteAsync( EntityRef entityRef ) throws Exception {
CollectionScope collectionScope = new CollectionScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -568,7 +632,7 @@ public class CpEntityManager implements EntityManager {
for ( String coll : collectionNames ) {
IndexScope indexScope = new IndexScopeImpl(
- appScope.getApplication(),
+ applicationScope.getApplication(),
new SimpleId( uuid, ownerType ),
CpEntityManager.getCollectionScopeNameFromCollectionName(coll) );
@@ -581,15 +645,15 @@ public class CpEntityManager implements EntityManager {
// deindex from default index scope
IndexScope defaultIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityIndex entityIndex = managerCache.getEntityIndex( defaultIndexScope );
entityIndex.deindex( entity );
IndexScope allTypesIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
ALL_TYPES);
EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope );
aei.deindex( entity );
@@ -969,13 +1033,13 @@ public class CpEntityManager implements EntityManager {
String collectionName = getCollectionScopeNameFromEntityType( entityRef.getType() );
CollectionScope collectionScope = new CollectionScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
collectionName );
IndexScope defaultIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
getCollectionScopeNameFromEntityType( entityRef.getType()) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -1238,7 +1302,8 @@ public class CpEntityManager implements EntityManager {
ColumnSlice<ByteBuffer, ByteBuffer> results =
cass.getColumns( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
- CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ), columnNames, be, be );
+ CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ),
+ columnNames, be, be );
if ( results != null ) {
values = new HashMap<String, Object>();
for ( HColumn<ByteBuffer, ByteBuffer> result : results.getColumns() ) {
@@ -1618,11 +1683,14 @@ public class CpEntityManager implements EntityManager {
Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLENAMES ), roleName, roleTitle, timestamp );
+ CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLENAMES ),
+ roleName, roleTitle, timestamp );
CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLETIMES ), roleName, inactivity, timestamp );
+ CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLETIMES ),
+ roleName, inactivity, timestamp );
CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( ownerId, DICTIONARY_SETS ), Schema.DICTIONARY_ROLENAMES, null, timestamp );
+ CassandraPersistenceUtils.key( ownerId, DICTIONARY_SETS ),
+ Schema.DICTIONARY_ROLENAMES, null, timestamp );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -1662,13 +1730,15 @@ public class CpEntityManager implements EntityManager {
private Object getRolePermissionsKey( String roleName ) {
- return CassandraPersistenceUtils.key( SimpleRoleRef.getIdForRoleName( roleName ), DICTIONARY_PERMISSIONS );
+ return CassandraPersistenceUtils.key( SimpleRoleRef.getIdForRoleName( roleName ),
+ DICTIONARY_PERMISSIONS );
}
private Object getRolePermissionsKey( UUID groupId, String roleName ) {
try {
- return CassandraPersistenceUtils.key( getGroupRoleRef( groupId, roleName ).getUuid(), DICTIONARY_PERMISSIONS );
+ return CassandraPersistenceUtils.key( getGroupRoleRef( groupId, roleName ).getUuid(),
+ DICTIONARY_PERMISSIONS );
} catch ( Exception e ) {
logger.error("Error creating role key for uuid {} and role {}", groupId, roleName );
return null;
@@ -2485,8 +2555,8 @@ public class CpEntityManager implements EntityManager {
// prepare to write and index Core Persistence Entity into default scope
CollectionScope collectionScope = new CollectionScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
getCollectionScopeNameFromEntityType( eType ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -2530,8 +2600,8 @@ public class CpEntityManager implements EntityManager {
// Index CP entity into default collection scope
IndexScope defaultIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope );
ei.index( cpEntity );
@@ -2644,16 +2714,18 @@ public class CpEntityManager implements EntityManager {
if ( !removeFromDictionary ) {
// Set the new value
- elementCoValue = CassandraPersistenceUtils.toStorableBinaryValue( elementCoValue, !entityHasDictionary );
+ elementCoValue = CassandraPersistenceUtils.toStorableBinaryValue(
+ elementCoValue, !entityHasDictionary );
- CassandraPersistenceUtils.addInsertToMutator(
- batch, dictionary_cf, CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ),
+ CassandraPersistenceUtils.addInsertToMutator( batch, dictionary_cf,
+ CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ),
entityHasDictionary
? elementValue : asList( elementValue ), elementCoValue, timestamp );
if ( !entityHasDictionary ) {
CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( entity.getUuid(), DICTIONARY_SETS ), dictionaryName, null, timestamp );
+ CassandraPersistenceUtils.key( entity.getUuid(), DICTIONARY_SETS ),
+ dictionaryName, null, timestamp );
}
}
else {
@@ -2749,14 +2821,15 @@ public class CpEntityManager implements EntityManager {
// refresh this Entity Manager's application's index
IndexScope indexScope = new IndexScopeImpl(
- appScope.getApplication(), appScope.getApplication(), "dummy" );
+ applicationScope.getApplication(), applicationScope.getApplication(), "dummy" );
EntityIndex ei = managerCache.getEntityIndex( indexScope );
ei.refresh();
}
- public static org.apache.usergrid.persistence.model.entity.Entity entityToCpEntity( Entity entity, UUID importId ) {
+ public static org.apache.usergrid.persistence.model.entity.Entity
+ entityToCpEntity( Entity entity, UUID importId ) {
UUID uuid = importId != null ? importId : entity.getUuid();
@@ -2777,6 +2850,216 @@ public class CpEntityManager implements EntityManager {
public void flushManagerCaches() {
managerCache.flush();
}
+
+
+ /**
+ * Completely reindex the application associated with this EntityManager.
+ */
+ public void reindex( EntityManagerFactory.ProgressObserver po ) throws Exception {
+ indexEntityConnectionsAndCollections( getApplication(), po );
+ }
+
+
+ /**
+ * Recursively index (or reindex) all of the collections and connections of a
+ * specified entity, and all of the collected and connected entities as well.
+ */
+ private void indexEntityConnectionsAndCollections(
+ final EntityRef entity, final EntityManagerFactory.ProgressObserver po ) {
+
+ final GraphManager gm = managerCache.getGraphManager(applicationScope);
+
+ final Id fromEntityId = new SimpleId( entity.getUuid(), entity.getType() );
+
+ logger.debug("Loading edges types from {}:{}\n scope {}:{}",
+ new Object[] { entity.getType(), entity.getUuid(),
+ applicationScope.getApplication().getType(),
+ applicationScope.getApplication().getUuid() } );
+
+ Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
+ new SimpleSearchEdgeType( fromEntityId, null , null ));
+
+ edgeTypes.forEach( new Action1<String>() {
+
+ @Override
+ public void call( final String edgeType ) {
+
+ logger.debug("Loading edges of edgeType {} from {}:{}\n scope {}:{}",
+ new Object[] { edgeType, entity.getType(), entity.getUuid(),
+ applicationScope.getApplication().getType(),
+ applicationScope.getApplication().getUuid() } );
+
+ Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
+ fromEntityId, edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ));
+
+ edges.forEach( new Action1<Edge>() {
+
+ @Override
+ public void call( Edge edge ) {
+
+ if ( isCollectionEdgeType( edge.getType() )) {
+
+ String collName = getCollectionName(edgeType);
+ String memberType = getCollectionEntityType(edgeType);
+
+ CollectionScope collScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpEntityManager.getCollectionScopeNameFromCollectionName(collName));
+ EntityCollectionManager collMgr =
+ managerCache.getEntityCollectionManager(collScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity collEntity =
+ collMgr.load( edge.getTargetNode()).toBlockingObservable().last();
+
+ CollectionScope memberScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpEntityManager.getCollectionScopeNameFromEntityType(memberType));
+ EntityCollectionManager memberMgr =
+ managerCache.getEntityCollectionManager(memberScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity =
+ memberMgr.load( edge.getTargetNode()).toBlockingObservable().last();
+
+ indexEntityIntoCollections( collEntity, memberEntity, collName, true );
+
+ EntityRef ref = new SimpleEntityRef(
+ memberEntity.getId().getType(), memberEntity.getId().getUuid());
+ po.onProgress( entity, ref, edge.getType());
+
+ // recursion
+ indexEntityConnectionsAndCollections( new SimpleEntityRef(
+ memberEntity.getId().getType(), memberEntity.getId().getUuid()),po);
+
+ } else if ( isConnectionEdgeType( edge.getType() )) {
+
+ String connType = getConnectionType(edgeType);
+ String targetEntityType = getConnectedEntityType(edgeType);
+ String sourceEntityType = entity.getType();
+
+ CollectionScope sourceScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpEntityManager.getCollectionScopeNameFromEntityType(sourceEntityType));
+ EntityCollectionManager sourceEcm =
+ managerCache.getEntityCollectionManager(sourceScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity sourceEntity =
+ sourceEcm.load( edge.getTargetNode()).toBlockingObservable().last();
+
+ CollectionScope targetScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpEntityManager.getCollectionScopeNameFromEntityType(targetEntityType));
+ EntityCollectionManager targetEcm =
+ managerCache.getEntityCollectionManager(targetScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+ targetEcm.load( edge.getTargetNode() ).toBlockingObservable().last();
+
+ indexEntityIntoConnection(
+ sourceEntity, targetEntity, targetEntityType, connType );
+
+ EntityRef ref = new SimpleEntityRef(
+ targetEntity.getId().getType(), targetEntity.getId().getUuid());
+ po.onProgress( entity, ref, edge.getType());
+
+ // recursion
+ indexEntityConnectionsAndCollections( new SimpleEntityRef(
+ targetEntity.getId().getType(), targetEntity.getId().getUuid()),po);
+ }
+ }
+
+ }); // end foreach on edges
+
+ }
+
+ }); // end foreach on edgeTypes
+
+ }
+
+
+ private void indexEntityIntoCollections(
+ org.apache.usergrid.persistence.model.entity.Entity collectionEntity,
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity,
+ String collName,
+ boolean connectBack ) {
+
+ logger.debug("Indexing into collections {} {}:{} member {}:{}", new Object[] {
+ collName, collectionEntity.getId().getType(), collectionEntity.getId().getUuid(),
+ memberEntity.getId().getType(), memberEntity.getId().getUuid() });
+
+ indexEntityIntoCollection( collectionEntity, memberEntity, collName );
+
+ CollectionInfo collection = getDefaultSchema()
+ .getCollection( memberEntity.getId().getType(), collName);
+
+ if (connectBack && collection != null && collection.getLinkedCollection() != null) {
+ indexEntityIntoCollections(
+ memberEntity, collectionEntity, collection.getLinkedCollection(), false );
+ }
+ }
+
+
+ void indexEntityIntoConnection(
+ org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity,
+ String targetEntityType,
+ String connType ) {
+
+ logger.debug("Indexing into connection {} source {}:{} target {}:{}", new Object[] {
+ connType, sourceEntity.getId().getType(), sourceEntity.getId().getUuid(),
+ targetEntity.getId().getType(), targetEntity.getId().getUuid() });
+
+ // Index the new connection in app|source|type context
+ IndexScope indexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ sourceEntity.getId(),
+ CpEntityManager.getConnectionScopeName(targetEntityType, connType));
+ EntityIndex ei = managerCache.getEntityIndex(indexScope);
+ ei.index(targetEntity);
+
+ // Index the new connection in app|scope|all-types context
+ IndexScope allTypesIndexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ sourceEntity.getId(),
+ ALL_TYPES);
+ EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
+ aei.index(targetEntity);
+ }
+
+
+ void indexEntityIntoCollection(
+ org.apache.usergrid.persistence.model.entity.Entity collectionEntity,
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity,
+ String collName ) {
+
+ // index member into entity collection | type scope
+ IndexScope collectionIndexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ collectionEntity.getId(),
+ CpEntityManager.getCollectionScopeNameFromCollectionName(collName));
+ EntityIndex collectionIndex = managerCache.getEntityIndex(collectionIndexScope);
+ collectionIndex.index(memberEntity);
+
+ // index member into entity | all-types scope
+ IndexScope entityAllTypesScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ collectionEntity.getId(),
+ ALL_TYPES);
+ EntityIndex entityAllCollectionIndex = managerCache.getEntityIndex(entityAllTypesScope);
+ entityAllCollectionIndex.index(memberEntity);
+
+ // index member into application | all-types scope
+ IndexScope appAllTypesScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ ALL_TYPES);
+ EntityIndex allCollectionIndex = managerCache.getEntityIndex(appAllTypesScope);
+ allCollectionIndex.index(memberEntity);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1aa04a71/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5ca9536..168c1bf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -24,47 +24,40 @@ import static java.lang.String.CASE_INSENSITIVE_ORDER;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.AbstractEntity;
import org.apache.usergrid.persistence.DynamicEntity;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.Results;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.cassandra.CounterUtils;
import org.apache.usergrid.persistence.cassandra.Setup;
import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
-import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
+import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
+import org.apache.usergrid.utils.UUIDUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +86,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
// The System Application where we store app and org metadata
public static final UUID SYSTEM_APP_ID =
UUID.fromString("b6768a08-b5d5-11e3-a495-10ddb1de66c3");
-
+
public static final UUID MANAGEMENT_APPLICATION_ID =
UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c8");
@@ -101,44 +94,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
- @Deprecated // use system app for these in future
- public static final String SYSTEM_APPS_TYPE = "zzzappszzz";
-
- @Deprecated
- public static final String SYSTEM_ORGS_TYPE = "zzzorgszzz";
-
- @Deprecated
- public static final String SYSTEM_PROPS_TYPE = "zzzpropszzz";
-
- @Deprecated // use system app for these in future
- private static final Id systemAppId =
- new SimpleId( SYSTEM_APP_ID, SYSTEM_APPS_TYPE );
-
- @Deprecated
- public static final CollectionScope SYSTEM_APPS_SCOPE =
- new CollectionScopeImpl( systemAppId, systemAppId, SYSTEM_APPS_TYPE );
-
- @Deprecated
- public static final IndexScope SYSTEM_APPS_INDEX_SCOPE =
- new IndexScopeImpl( systemAppId, systemAppId, SYSTEM_APPS_TYPE);
-
- @Deprecated
- public static final CollectionScope SYSTEM_ORGS_SCOPE =
- new CollectionScopeImpl( systemAppId, systemAppId, SYSTEM_ORGS_TYPE);
-
- @Deprecated
- public static final IndexScope SYSTEM_ORGS_INDEX_SCOPE =
- new IndexScopeImpl( systemAppId, systemAppId, SYSTEM_ORGS_TYPE);
-
- @Deprecated
- public static final CollectionScope SYSTEM_PROPS_SCOPE =
- new CollectionScopeImpl( systemAppId, systemAppId, SYSTEM_PROPS_TYPE);
-
- @Deprecated
- public static final IndexScope SYSTEM_PROPS_INDEX_SCOPE =
- new IndexScopeImpl( systemAppId, systemAppId, SYSTEM_PROPS_TYPE);
-
-
// cache of already instantiated entity managers
private LoadingCache<UUID, EntityManager> entityManagers
= CacheBuilder.newBuilder().maximumSize(100).build(new CacheLoader<UUID, EntityManager>() {
@@ -168,21 +123,28 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
logger.warn("NOTE: Counters have been disabled by configuration...");
}
- // if system app does have apps, orgs and props then populate it
+ }
+
+
+ private void init() {
+
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
+
try {
- EntityManager em = getEntityManager(SYSTEM_APP_ID);
- Results orgs = em.searchCollection(em.getApplicationRef(), "organizations", null);
- if (orgs.isEmpty()) {
- populateSystemAppsFromEs();
- populateSystemOrgsFromEs();
- populateSystemPropsFromEs();
+ if ( em.getApplication() == null ) {
+ logger.info("Creating system application");
+ Map sysAppProps = new HashMap<String, Object>();
+ sysAppProps.put( PROPERTY_NAME, "systemapp");
+ em.create(SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps );
+ em.getApplication();
+ em.refreshIndex();
}
} catch (Exception ex) {
- throw new RuntimeException("Fatal error migrating data", ex);
+ throw new RuntimeException("Fatal error creating system application", ex);
}
}
-
+
public CpManagerCache getManagerCache() {
@@ -267,7 +229,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public UUID initializeApplication( String organizationName, UUID applicationId, String name,
Map<String, Object> properties ) throws Exception {
- String appName = buildAppName( organizationName, name );
+
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
+
+ final String appName = buildAppName( organizationName, name );
// check for pre-existing application
if ( lookupApplication( appName ) != null ) {
@@ -278,58 +243,38 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
UUID orgUuid = lookupOrganization( organizationName );
if ( orgUuid == null ) {
-
- // organization does not exist, create it.
- Entity orgInfoEntity = new Entity(generateOrgId( UUIDGenerator.newTimeUUID() ));
- orgUuid = orgInfoEntity.getId().getUuid();
-
- long timestamp = System.currentTimeMillis();
- orgInfoEntity.setField( new LongField( PROPERTY_CREATED, (long)(timestamp / 1000)));
- orgInfoEntity.setField( new StringField( PROPERTY_NAME, name ));
- orgInfoEntity.setField( new UUIDField( PROPERTY_UUID, orgUuid ));
-
- EntityCollectionManager ecm = getManagerCache()
- .getEntityCollectionManager( SYSTEM_ORGS_SCOPE );
- EntityIndex eci = getManagerCache()
- .getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
-
- orgInfoEntity = ecm.write( orgInfoEntity ).toBlockingObservable().last();
- eci.index( orgInfoEntity );
- eci.refresh();
+ // create new org because the specified one does not exist
+ final String orgName = organizationName;
+ Entity orgInfo = em.create("organization", new HashMap<String, Object>() {{
+ put( PROPERTY_NAME, orgName );
+ }});
+ em.refreshIndex();
+ orgUuid = orgInfo.getUuid();
}
+ // create appinfo entry in the system app
+ final UUID appId = applicationId;
+ final UUID orgId = orgUuid;
+ Map<String, Object> appInfoMap = new HashMap<String, Object>() {{
+ put( PROPERTY_NAME, appName );
+ put( "applicationUuid", appId );
+ put( "organizationUuid", orgId );
+ }};
+ Entity appInfo = em.create( "appinfo", appInfoMap );
+ em.refreshIndex();
+
+ // create application entity
if ( properties == null ) {
properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
}
properties.put( PROPERTY_NAME, appName );
+ EntityManager appEm = getEntityManager( applicationId );
+ appEm.create( applicationId, TYPE_APPLICATION, properties );
+ appEm.resetRoles();
+ appEm.refreshIndex();
- Entity appInfoEntity = new Entity( generateApplicationId( applicationId ));
-
- long timestamp = System.currentTimeMillis();
- appInfoEntity.setField( new LongField( PROPERTY_CREATED, (long)(timestamp / 1000)));
- appInfoEntity.setField( new StringField( PROPERTY_NAME, name ));
- appInfoEntity.setField( new UUIDField( "applicationUuid", applicationId ));
- appInfoEntity.setField( new UUIDField( "organizationUuid", orgUuid ));
-
- // create app in system app scope
- {
- EntityCollectionManager ecm = getManagerCache()
- .getEntityCollectionManager(SYSTEM_APPS_SCOPE );
- EntityIndex eci = getManagerCache()
- .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
-
- appInfoEntity = ecm.write( appInfoEntity ).toBlockingObservable().last();
- eci.index( appInfoEntity );
- eci.refresh();
- }
-
- // create app in its own scope
- EntityManager em = getEntityManager( applicationId );
- em.create( applicationId, TYPE_APPLICATION, properties );
- em.resetRoles();
- em.refreshIndex();
-
+ logger.info("Initialized application {}", appName );
return applicationId;
}
@@ -337,8 +282,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public ApplicationScope getApplicationScope( UUID applicationId ) {
// We can always generate a scope, it doesn't matter if the application exists yet or not.
-
- final ApplicationScopeImpl scope = new ApplicationScopeImpl( generateApplicationId( applicationId ) );
+ final ApplicationScopeImpl scope =
+ new ApplicationScopeImpl( generateApplicationId( applicationId ) );
return scope;
}
@@ -348,39 +293,40 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public UUID importApplication(
String organization, UUID applicationId,
String name, Map<String, Object> properties) throws Exception {
+
throw new UnsupportedOperationException("Not supported yet.");
}
public UUID lookupOrganization( String name) throws Exception {
+ init();
Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'");
-
- EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
- CandidateResults results = ei.search( q );
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
+ Results results = em.searchCollection( em.getApplicationRef(), "organizations", q);
if ( results.isEmpty() ) {
return null;
}
- return results.iterator().next().getId().getUuid();
+ return results.iterator().next().getUuid();
}
@Override
- public UUID lookupApplication( String name) throws Exception {
+ public UUID lookupApplication( String name ) throws Exception {
+ init();
Query q = Query.fromQL( PROPERTY_NAME + " = '" + name + "'");
- EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
-
- CandidateResults results = ei.search( q );
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
+ Results results = em.searchCollection( em.getApplicationRef(), "appinfos", q);
if ( results.isEmpty() ) {
return null;
}
- return results.iterator().next().getId().getUuid();
+ return results.iterator().next().getUuid();
}
@@ -388,45 +334,47 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Metered(group = "core", name = "EntityManagerFactory_getApplication")
public Map<String, UUID> getApplications() throws Exception {
- EntityCollectionManager em = getManagerCache()
- .getEntityCollectionManager(SYSTEM_APPS_SCOPE );
- EntityIndex ei = getManagerCache()
- .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
-
Map<String, UUID> appMap = new HashMap<String, UUID>();
- String cursor = null;
- boolean done = false;
+ ApplicationScope appScope = getApplicationScope(SYSTEM_APP_ID);
+ GraphManager gm = managerCache.getGraphManager(appScope);
- while ( !done ) {
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
+ Application app = em.getApplication();
+ Id fromEntityId = new SimpleId( app.getUuid(), app.getType() );
- Query q = Query.fromQL("select *");
- q.setCursor( cursor );
+ String edgeType = CpEntityManager.getEdgeTypeFromCollectionName("appinfos", "appinfo");
- CandidateResults results = ei.search( q );
- cursor = results.getCursor();
+ logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
+ new Object[] { edgeType, fromEntityId.getType(), fromEntityId.getUuid() } );
- Iterator<CandidateResult> iter = results.iterator();
- while ( iter.hasNext() ) {
+ Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
+ fromEntityId, edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ));
+
+ Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+ while ( iter.hasNext() ) {
- CandidateResult cr = iter.next();
- Entity e = em.load( cr.getId() ).toBlockingObservable().last();
+ Edge edge = iter.next();
+ Id targetId = edge.getTargetNode();
- if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
- logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] { cr.getId().getUuid(), cr.getId().getType(),
- cr.getVersion(), e.getVersion()});
- continue;
- }
-
- appMap.put(
- (String)e.getField(PROPERTY_NAME).getValue(),
- (UUID)e.getField("applicationUuid").getValue() );
- }
+ logger.debug("getApplications(): Processing edge from {}:{} to {}:{}", new Object[] {
+ edge.getSourceNode().getType(), edge.getSourceNode().getUuid(),
+ edge.getTargetNode().getType(), edge.getTargetNode().getUuid()
+ });
- if ( cursor == null ) {
- done = true;
- }
+ CollectionScope collScope = new CollectionScopeImpl(
+ appScope.getApplication(),
+ appScope.getApplication(),
+ CpEntityManager.getCollectionScopeNameFromCollectionName("appinfos"));
+
+ org.apache.usergrid.persistence.model.entity.Entity e =
+ managerCache.getEntityCollectionManager( collScope ).load( targetId )
+ .toBlockingObservable().lastOrDefault(null);
+
+ appMap.put(
+ (String)e.getField( PROPERTY_NAME ).getValue(),
+ (UUID)e.getField( "applicationUuid" ).getValue());
}
return appMap;
@@ -442,30 +390,26 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public Map<String, String> getServiceProperties() {
- EntityIndex ei = getManagerCache()
- .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
- EntityCollectionManager em = getManagerCache()
- .getEntityCollectionManager( SYSTEM_PROPS_SCOPE );
+ Map<String, String> props = new HashMap<String,String>();
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
Query q = Query.fromQL("select *");
+ Results results = null;
+ try {
+ results = em.searchCollection( em.getApplicationRef(), "propertymaps", q);
- CandidateResults results = ei.search( q );
-
- if ( results.isEmpty() ) {
- return new HashMap<String,String>();
+ } catch (Exception ex) {
+ logger.error("Error getting system properties", ex);
}
- CandidateResult cr = results.iterator().next();
- Entity propsEntity = em.load( cr.getId() ).toBlockingObservable().last();
-
- Map<String, String> props = new HashMap<String, String>();
-
- // intentionally going only one-level deep into fields and treating all
- // values as strings because that is all we need for service properties.
- for ( Field f : propsEntity.getFields() ) {
- props.put( f.getName(), f.getValue().toString() );
+ if ( results == null || results.isEmpty() ) {
+ return props;
}
+ org.apache.usergrid.persistence.Entity e = results.getEntity();
+ for ( String key : e.getProperties().keySet() ) {
+ props.put( key, props.get(key).toString() );
+ }
return props;
}
@@ -473,30 +417,39 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public boolean updateServiceProperties(Map<String, String> properties) {
- EntityCollectionManager em = getManagerCache()
- .getEntityCollectionManager( SYSTEM_PROPS_SCOPE );
- EntityIndex ei = getManagerCache()
- .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
-
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
Query q = Query.fromQL("select *");
- CandidateResults results = ei.search( q );
- Entity propsEntity;
+ Results results = null;
+ try {
+ results = em.searchCollection( em.getApplicationRef(), "propertymaps", q);
+
+ } catch (Exception ex) {
+ logger.error("Error getting system properties", ex);
+ return false;
+ }
+
+ org.apache.usergrid.persistence.Entity propsEntity = null;
+
if ( !results.isEmpty() ) {
- propsEntity = em.load( results.iterator().next().getId()).toBlockingObservable().last();
+ propsEntity = results.getEntity();
+
} else {
- propsEntity = new Entity( new SimpleId( "properties" ));
- long timestamp = System.currentTimeMillis();
- propsEntity.setField( new LongField( PROPERTY_CREATED, (long)(timestamp / 1000)));
+ propsEntity = EntityFactory.newEntity( UUIDUtils.newTimeUUID(), "propertymap");
}
// intentionally going only one-level deep into fields and treating all
// values as strings because that is all we need for service properties
for ( String key : properties.keySet() ) {
- propsEntity.setField( new StringField(key, properties.get(key)) );
+ propsEntity.setProperty( key, properties.get(key).toString() );
}
- propsEntity = em.write( propsEntity ).toBlockingObservable().last();
- ei.index( propsEntity );
+ try {
+ em.update( propsEntity );
+
+ } catch (Exception ex) {
+ logger.error("Error updating service properties", ex);
+ return false;
+ }
return true;
}
@@ -513,27 +466,34 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public boolean deleteServiceProperty(String name) {
- EntityCollectionManager em = getManagerCache().getEntityCollectionManager( SYSTEM_PROPS_SCOPE );
- EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
-
+ EntityManager em = getEntityManager(SYSTEM_APP_ID);
Query q = Query.fromQL("select *");
- CandidateResults results = ei.search( q );
-
- Entity propsEntity = em.load(
- results.iterator().next().getId() ).toBlockingObservable().last();
+ Results results = null;
+ try {
+ results = em.searchCollection( em.getApplicationRef(), "propertymaps", q);
- if ( propsEntity == null ) {
- return false; // nothing to delete
+ } catch (Exception ex) {
+ logger.error("Error getting service property for delete of property: " + name, ex);
+ return false;
}
- if ( propsEntity.getField(name) == null ) {
- return false; // no such field
+ org.apache.usergrid.persistence.Entity propsEntity = null;
+
+ if ( !results.isEmpty() ) {
+ propsEntity = results.getEntity();
+
+ } else {
+ propsEntity = EntityFactory.newEntity( UUIDUtils.newTimeUUID(), "propertymap");
}
- propsEntity.removeField( name );
+ try {
+ ((AbstractEntity)propsEntity).clearDataset( name );
+ em.update( propsEntity );
- propsEntity = em.write( propsEntity ).toBlockingObservable().last();
- ei.index( propsEntity );
+ } catch (Exception ex) {
+ logger.error("Error deleting service property name: " + name, ex);
+ return false;
+ }
return true;
}
@@ -565,15 +525,12 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
return MANAGEMENT_APPLICATION_ID;
}
+
@Override
public UUID getDefaultAppId() {
return DEFAULT_APPLICATION_ID;
}
- private Id generateOrgId(UUID id){
- return new SimpleId( id, "organization" );
- }
-
private Id generateApplicationId(UUID id){
return new SimpleId( id, Application.ENTITY_TYPE );
@@ -593,19 +550,21 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public void refreshIndex() {
- // refresh factory's indexes, will refresh all three index scopes
- managerCache.getEntityIndex( CpEntityManagerFactory.SYSTEM_APPS_INDEX_SCOPE ).refresh();
-
- // these are unecessary because of above call
- //managerCache.getEntityIndex( CpEntityManagerFactory.SYSTEM_ORGS_INDEX_SCOPE ).refresh();
- //managerCache.getEntityIndex( CpEntityManagerFactory.SYSTEM_PROPS_INDEX_SCOPE ).refresh();
-
// refresh special indexes without calling EntityManager refresh because stack overflow
+
+ // system app
+ IndexScope sscope = new IndexScopeImpl(
+ new SimpleId( SYSTEM_APP_ID, "application"),
+ new SimpleId( SYSTEM_APP_ID, "application"), "dummy");
+ managerCache.getEntityIndex( sscope ).refresh();
+
+ // default app
IndexScope mscope = new IndexScopeImpl(
new SimpleId( getManagementAppId(), "application"),
new SimpleId( getManagementAppId(), "application"), "dummy");
managerCache.getEntityIndex( mscope ).refresh();
+ // management app
IndexScope dscope = new IndexScopeImpl(
new SimpleId( getDefaultAppId(), "application"),
new SimpleId( getDefaultAppId(), "application"), "dummy");
@@ -613,137 +572,36 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
- public void rebuildInternalIndexes( ProgressObserver po ) throws Exception {
-
- // get all connections from systems app
-// GraphManager gm = managerCache.getGraphManager( CpEntityManagerFactory.SYSTEM_APPS_SCOPE );
-//
-// Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
-// new SimpleSearchEdgeType( systemAppId, null , null ));
-
- logger.info("Rebuilding system apps index");
- rebuildIndexScope(
- CpEntityManagerFactory.SYSTEM_APPS_SCOPE,
- CpEntityManagerFactory.SYSTEM_APPS_INDEX_SCOPE, po );
+ public void rebuildAllIndexes( ProgressObserver po ) throws Exception {
- logger.info("Rebuilding system orgs index");
- rebuildIndexScope(
- CpEntityManagerFactory.SYSTEM_ORGS_SCOPE,
- CpEntityManagerFactory.SYSTEM_ORGS_INDEX_SCOPE, po );
+ logger.info("\n\nRebuilding all indexes\n");
- logger.info("Rebuilding system props index");
- rebuildIndexScope(
- CpEntityManagerFactory.SYSTEM_PROPS_SCOPE,
- CpEntityManagerFactory.SYSTEM_PROPS_INDEX_SCOPE, po );
+ rebuildInternalIndexes( po );
- logger.info("Rebuilding management application index");
- rebuildApplicationIndex( MANAGEMENT_APPLICATION_ID, po );
+ Map<String, UUID> appMap = getApplications();
- logger.info("Rebuilding default application index");
- rebuildApplicationIndex( DEFAULT_APPLICATION_ID, po );
- }
-
-
- private void rebuildIndexScope( CollectionScope cs, IndexScope is, ProgressObserver po ) {
-
- logger.info("Rebuild index scope for {}:{}:{}", new Object[] {
- cs.getOwner(), cs.getApplication(), cs.getName()
- });
+ logger.info("About to rebuild indexes for {} applications", appMap.keySet().size());
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager( cs );
- EntityIndex ei = managerCache.getEntityIndex( is );
-
- Query q = Query.fromQL("select *");
- CandidateResults results = ei.search( q );
-
- Iterator<CandidateResult> iter = results.iterator();
- while (iter.hasNext()) {
- CandidateResult cr = iter.next();
-
- Entity entity = ecm.load(cr.getId()).toBlockingObservable().last();
-
- if ( cr.getVersion().compareTo( entity.getVersion()) < 0 ) {
- logger.warn(" Ignoring stale version uuid:{} type:{} state v:{} latest v:{}",
- new Object[] {
- cr.getId().getUuid(), cr.getId().getType(),
- cr.getVersion(), entity.getVersion()
- });
-
- } else {
-
- logger.info(" Updating entity type {} with id {} for app {}/{}", new Object[] {
- cr.getId().getType(), cr.getId().getUuid(), cs.getApplication().getUuid()
- });
-
- ei.index(entity);
-
- if ( po != null ) {
- po.onProgress();
- }
-
- }
+ for ( UUID appUuid : appMap.values() ) {
+ rebuildApplicationIndexes( appUuid, po );
}
}
+
-
- public void rebuildApplicationIndex( UUID appId, ProgressObserver po ) throws Exception {
-
- EntityManager em = getEntityManager( appId );
-
- Set<String> collections = em.getApplicationCollections();
-
- logger.debug("For app {} found {} collections: {}", new Object[] {
- appId, collections.size(), collections });
-
- for ( String collection : collections ) {
- rebuildCollectionIndex( appId, collection, po );
- }
+ @Override
+ public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
+ rebuildApplicationIndexes(SYSTEM_APP_ID, po);
}
- public void rebuildCollectionIndex( UUID appId, String collectionName, ProgressObserver po )
- throws Exception {
-
- logger.info( "Reindexing collection: {} for app id: {}", collectionName, appId );
-
+ @Override
+ public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception {
+
+ logger.info("Rebuilding index for application id {}", appId);
EntityManager em = getEntityManager( appId );
Application app = em.getApplication();
- // search for all orgs
-
- Query query = new Query();
- query.setLimit(REBUILD_PAGE_SIZE );
- Results r = null;
-
- do {
-
- r = em.searchCollection( app, collectionName, query );
-
- for ( org.apache.usergrid.persistence.Entity entity : r.getEntities() ) {
-
- logger.info( " Updating Entity name {}, type: {}, id: {} in app id: {}", new Object[] {
- entity.getName(), entity.getType(), entity.getUuid(), appId
- } );
-
- try {
- em.update( entity );
-
- if ( po != null ) {
- po.onProgress();
- }
- }
- catch ( DuplicateUniquePropertyExistsException dupee ) {
- logger.error( " Duplicate property for type: {} with id: {} for app id: {}. "
- + "Property name: {} , value: {}", new Object[] {
- entity.getType(), entity.getUuid(), appId, dupee.getPropertyName(),
- dupee.getPropertyValue()
- } );
- }
- }
-
- query.setCursor( r.getCursor() );
- }
- while ( r != null && r.size() == REBUILD_PAGE_SIZE );
+ ((CpEntityManager)em).reindex( po );
}
@@ -756,142 +614,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
}
-
- private void populateSystemOrgsFromEs() throws Exception {
-
- logger.info("Migrating system orgs");
-
- EntityCollectionManager ecm = getManagerCache()
- .getEntityCollectionManager(SYSTEM_ORGS_SCOPE);
- EntityIndex ei = getManagerCache()
- .getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
-
- EntityManager systemAppEm = getEntityManager(SYSTEM_APP_ID);
-
- String cursor = null;
- boolean done = false;
-
- while ( !done ) {
-
- Query q = Query.fromQL("select *");
- q.setCursor( cursor );
-
- CandidateResults results = ei.search( q );
- cursor = results.getCursor();
-
- Iterator<CandidateResult> iter = results.iterator();
- while ( iter.hasNext() ) {
-
- CandidateResult cr = iter.next();
- Entity e = ecm.load( cr.getId() ).toBlockingObservable().last();
-
- if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
- logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] { cr.getId().getUuid(), cr.getId().getType(),
- cr.getVersion(), e.getVersion()});
- continue;
- }
-
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
- systemAppEm.create("organization", entityMap );
- }
-
- if ( cursor == null ) {
- done = true;
- }
- }
- }
-
-
- private void populateSystemAppsFromEs() throws Exception {
-
- logger.info("Migrating system apps");
-
- EntityCollectionManager ecm = getManagerCache()
- .getEntityCollectionManager(SYSTEM_APPS_SCOPE );
- EntityIndex ei = getManagerCache()
- .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
-
- EntityManager systemAppEm = getEntityManager(SYSTEM_APP_ID);
-
- String cursor = null;
- boolean done = false;
-
- while ( !done ) {
-
- Query q = Query.fromQL("select *");
- q.setCursor( cursor );
-
- CandidateResults results = ei.search( q );
- cursor = results.getCursor();
-
- Iterator<CandidateResult> iter = results.iterator();
- while ( iter.hasNext() ) {
-
- CandidateResult cr = iter.next();
- Entity e = ecm.load( cr.getId() ).toBlockingObservable().last();
-
- if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
- logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] { cr.getId().getUuid(), cr.getId().getType(),
- cr.getVersion(), e.getVersion()});
- continue;
- }
-
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
- systemAppEm.create("application", entityMap );
- }
-
- if ( cursor == null ) {
- done = true;
- }
- }
- }
-
-
- private void populateSystemPropsFromEs() throws Exception {
-
- logger.info("Migrating system props");
-
- EntityCollectionManager ecm = getManagerCache()
- .getEntityCollectionManager(SYSTEM_PROPS_SCOPE );
- EntityIndex ei = getManagerCache()
- .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
-
- EntityManager systemAppEm = getEntityManager(SYSTEM_APP_ID);
-
- String cursor = null;
- boolean done = false;
-
- while ( !done ) {
-
- Query q = Query.fromQL("select *");
- q.setCursor( cursor );
-
- CandidateResults results = ei.search( q );
- cursor = results.getCursor();
-
- Iterator<CandidateResult> iter = results.iterator();
- while ( iter.hasNext() ) {
-
- CandidateResult cr = iter.next();
- Entity e = ecm.load( cr.getId() ).toBlockingObservable().last();
-
- if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
- logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] { cr.getId().getUuid(), cr.getId().getType(),
- cr.getVersion(), e.getVersion()});
- continue;
- }
-
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
- systemAppEm.create("property", entityMap );
- }
-
- if ( cursor == null ) {
- done = true;
- }
- }
+ @Override
+ public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po ) {
}
-
}
[08/13] git commit: Couple of test fixes.
Posted by to...@apache.org.
Couple of test fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3ec144f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3ec144f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3ec144f3
Branch: refs/heads/esbatching
Commit: 3ec144f3b5c7d76c225e9774d08b41de5f94ef54
Parents: df8c6b5
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 15:57:37 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 15:57:37 2014 -0400
----------------------------------------------------------------------
.../usergrid/persistence/PerformanceEntityRebuildIndexTest.java | 2 +-
.../usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ec144f3/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 663de59..0f033a2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -102,7 +102,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- create a bunch of entities
- final long stopTime = System.currentTimeMillis() + 300; // + RUNTIME;
+ final long stopTime = System.currentTimeMillis() + RUNTIME;
final Map<String, Object> entityMap = new HashMap<>();
entityMap.put( "key1", 1000 );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ec144f3/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index efd9dfb..dc7ab3c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -105,7 +105,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
traceTagManager.attach( traceTag );
logger.info( "EntityDaoTest.testCreateAndGet" );
- UUID applicationId = createApplication( "testOrganization", "testCreateAndGet" );
+ UUID applicationId = createApplication( "EntityManagerFactoryImplIT", "testCreateAndGet" );
logger.info( "Application id " + applicationId );
EntityManager em = emf.getEntityManager( applicationId );
[07/13] git commit: Improvements to test, which now indexes data,
reads data, deletes application indexes,
rebuilds index and verifies that data is readable again.
Posted by to...@apache.org.
Improvements to test, which now indexes data, reads data, deletes application indexes, rebuilds index and verifies that data is readable again.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/df8c6b58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/df8c6b58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/df8c6b58
Branch: refs/heads/esbatching
Commit: df8c6b585c03040e3ba2819ef76a2875eea8f982
Parents: 352d69e
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 14:08:11 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 14:08:11 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 40 +++++-
.../corepersistence/CpEntityManagerFactory.java | 5 +-
.../corepersistence/CpRelationManager.java | 4 +-
.../PerformanceEntityRebuildIndexTest.java | 124 ++++++++++++++++---
.../index/impl/EsEntityIndexImpl.java | 18 +++
5 files changed, 167 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/df8c6b58/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 49be5c3..38453e1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2907,12 +2907,27 @@ public class CpEntityManager implements EntityManager {
CollectionScope collScope = new CollectionScopeImpl(
applicationScope.getApplication(),
applicationScope.getApplication(),
- CpEntityManager.getCollectionScopeNameFromCollectionName(collName));
+ CpEntityManager.getCollectionScopeNameFromEntityType(entity.getType()));
EntityCollectionManager collMgr =
managerCache.getEntityCollectionManager(collScope);
org.apache.usergrid.persistence.model.entity.Entity collEntity =
- collMgr.load( edge.getTargetNode()).toBlockingObservable().last();
+ collMgr.load( edge.getSourceNode() ).toBlockingObservable().last();
+
+// if (collEntity == null) {
+// if (logger.isDebugEnabled()) {
+// logger.error("FAILED to load entity {}:{} "
+// + "from scope\n app {}\n owner {}\n name {}",
+// new Object[]{
+// edge.getSourceNode().getType(),
+// edge.getSourceNode().getUuid(),
+// collScope.getApplication(),
+// collScope.getOwner(),
+// collScope.getName()
+// });
+// }
+// return;
+// }
CollectionScope memberScope = new CollectionScopeImpl(
applicationScope.getApplication(),
@@ -2924,6 +2939,21 @@ public class CpEntityManager implements EntityManager {
org.apache.usergrid.persistence.model.entity.Entity memberEntity =
memberMgr.load( edge.getTargetNode()).toBlockingObservable().last();
+// if (memberEntity == null) {
+// if (logger.isDebugEnabled()) {
+// logger.error("FAILED to load entity {}:{} "
+// + "from scope\n app {}\n owner {}\n name {}",
+// new Object[]{
+// edge.getTargetNode().getType(),
+// edge.getTargetNode().getUuid(),
+// memberScope.getApplication(),
+// memberScope.getOwner(),
+// memberScope.getName()
+// });
+// }
+// return;
+// }
+
indexEntityIntoCollections( collEntity, memberEntity, collName, true );
EntityRef ref = new SimpleEntityRef(
@@ -2948,7 +2978,7 @@ public class CpEntityManager implements EntityManager {
managerCache.getEntityCollectionManager(sourceScope);
org.apache.usergrid.persistence.model.entity.Entity sourceEntity =
- sourceEcm.load( edge.getTargetNode()).toBlockingObservable().last();
+ sourceEcm.load( fromEntityId ).toBlockingObservable().last();
CollectionScope targetScope = new CollectionScopeImpl(
applicationScope.getApplication(),
@@ -2998,6 +3028,10 @@ public class CpEntityManager implements EntityManager {
.getCollection( memberEntity.getId().getType(), collName);
if (connectBack && collection != null && collection.getLinkedCollection() != null) {
+
+ logger.debug("Linking back from entity in collection {} to collection {}",
+ collection.getName(), collection.getLinkedCollection());
+
indexEntityIntoCollections(
memberEntity, collectionEntity, collection.getLinkedCollection(), false );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/df8c6b58/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 00a808c..b3d5135 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -597,11 +597,13 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception {
- logger.info("Rebuilding index for application id {}", appId);
EntityManager em = getEntityManager( appId );
Application app = em.getApplication();
((CpEntityManager)em).reindex( po );
+ em.refreshIndex();
+
+ logger.info("\n\nRebuilt index for application {} id {}\n", app.getName(), appId );
}
@@ -616,6 +618,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po ) {
+ throw new UnsupportedOperationException( "Not supported yet." );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/df8c6b58/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 04ed701..be21e85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -916,7 +916,7 @@ public class CpRelationManager implements RelationManager {
results.merge( newResults );
}
- if ( crs.isEmpty() ) { // no more results
+ if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
satisfied = true;
} else if ( results.size() == query.getLimit() ) { // got what we need
@@ -933,7 +933,7 @@ public class CpRelationManager implements RelationManager {
logger.warn("Satisfy query limit {}, new limit {} query count {}", new Object[] {
originalLimit, query.getLimit(), queryCount
});
- }
+ }
}
return results;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/df8c6b58/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 7af66d3..663de59 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -33,11 +33,24 @@ import org.apache.usergrid.CoreApplication;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
+import com.google.inject.Injector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.usergrid.cassandra.Concurrent;
+import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -52,7 +65,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
- private static final long writeDelayMs = 9;
+ private static final long writeDelayMs = 15;
//private static final long readDelayMs = 7;
@Rule
@@ -81,12 +94,15 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
@Test
- public void rebuildIndex() {
+ public void rebuildIndex() throws Exception {
logger.info("Started rebuildIndex()");
final EntityManager em = app.getEntityManager();
- final long stopTime = System.currentTimeMillis() + RUNTIME;
+
+ // ----------------- create a bunch of entities
+
+ final long stopTime = System.currentTimeMillis() + 300; // + RUNTIME;
final Map<String, Object> entityMap = new HashMap<>();
entityMap.put( "key1", 1000 );
@@ -94,29 +110,48 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
entityMap.put( "key3", "Some value" );
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-
- int i = 0;
+ int entityCount = 0;
while ( System.currentTimeMillis() < stopTime ) {
-
- entityMap.put( "key", i );
+ entityMap.put("key", entityCount );
final Entity created;
try {
created = em.create("testType", entityMap );
} catch (Exception ex) {
throw new RuntimeException("Error creating entity", ex);
}
-
entityRefs.add( new SimpleEntityRef( created.getType(), created.getUuid() ) );
-
- if ( i % 100 == 0 ) {
- logger.info("Created {} entities", i );
+ if ( entityCount % 100 == 0 ) {
+ logger.info("Created {} entities", entityCount );
}
- i++;
-
+ entityCount++;
try { Thread.sleep( writeDelayMs ); } catch (InterruptedException ignored ) {}
}
- logger.info("Created {} entities", i);
+ logger.info("Created {} entities", entityCount);
+ em.refreshIndex();
+
+ // ----------------- test that we can read them, should work fine
+
+ logger.debug("Read the data");
+ readData("testTypes", entityCount );
+
+ // ----------------- delete the system and application indexes
+ logger.debug("Deleting app index and system app index");
+ deleteIndex( CpEntityManagerFactory.SYSTEM_APP_ID );
+ deleteIndex( em.getApplicationId() );
+
+ // ----------------- test that we can read them, should fail
+
+ logger.debug("Reading data, should fail this time ");
+ try {
+ readData( "testTypes", entityCount );
+ fail("should have failed to read data");
+
+ } catch (Exception expected) {}
+
+ // ----------------- rebuild index
+
+ logger.debug("Preparing to rebuild all indexes");;
final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
final Meter meter = registry.meter( meterName );
@@ -125,12 +160,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
int counter = 0;
@Override
public void onProgress( EntityRef s, EntityRef t, String etype ) {
-
meter.mark();
-
logger.debug("Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
-
if ( !logger.isDebugEnabled() && counter % 100 == 0 ) {
logger.info("Reindexed {} entities", counter );
}
@@ -142,12 +174,68 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
setup.getEmf().rebuildAllIndexes( po );
registry.remove( meterName );
- logger.info("Finished rebuildIndex()");
+ logger.info("Rebuilt index");
} catch (Exception ex) {
logger.error("Error rebuilding index", ex);
fail();
}
+ // ----------------- test that we can read them
+
+ readData( "testTypes", entityCount );
+ }
+
+ /**
+ * Delete index for all applications, just need the one to get started.
+ */
+ private void deleteIndex( UUID appUuid ) {
+
+ Injector injector = CpSetup.getInjector();
+ EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
+
+ Id appId = new SimpleId( appUuid, "application");
+ IndexScope is = new IndexScopeImpl( appId, appId, "application");
+ EntityIndex ei = eif.createEntityIndex(is);
+ EsEntityIndexImpl eeii = (EsEntityIndexImpl)ei;
+
+ eeii.deleteIndex();
+ }
+
+ private int readData( String collectionName ) throws Exception {
+ return readData( collectionName, -1 );
+ }
+
+ private int readData( String collectionName, int expected ) throws Exception {
+
+ EntityManager em = app.getEntityManager();
+
+ Query q = Query.fromQL("select * where key1=1000");
+ q.setLimit(40);
+ Results results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+
+ int count = 0;
+ while ( true ) {
+
+ for ( Entity e : results.getEntities() ) {
+ assertEquals( 2000, e.getProperty("key2"));
+ //if ( count % 100 == 0 ) {
+ logger.info( "read {} entities", count++);
+ //}
+ }
+
+ if ( results.hasCursor() ) {
+ logger.info( "Counted {} : query again with cursor", count);
+ q.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+ } else {
+ break;
+ }
+ }
+
+ if ( expected != -1 && expected != count ) {
+ throw new RuntimeException("Did not get expected " + expected + " entities");
+ }
+ return count;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/df8c6b58/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8401e13..2eb8414 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -58,7 +58,10 @@ import org.apache.usergrid.persistence.model.field.SetField;
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.field.UUIDField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -671,11 +674,13 @@ public class EsEntityIndexImpl implements EntityIndex {
}
+ @Override
public void refresh() {
client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
log.debug("Refreshed index: " + indexName);
}
+
@Override
public CandidateResults getEntityVersions(Id id) {
Query query = new Query();
@@ -684,4 +689,17 @@ public class EsEntityIndexImpl implements EntityIndex {
return results;
}
+ /**
+ * For testing only.
+ */
+ public void deleteIndex() {
+ AdminClient adminClient = client.admin();
+ DeleteIndexResponse response = adminClient.indices().prepareDelete( indexName ).get();
+ if ( response.isAcknowledged() ) {
+ log.info("Deleted index: " + indexName );
+ } else {
+ log.info("Failed to delete index " + indexName );
+ }
+ }
+
}
[11/13] git commit: Merge branch 'two-dot-o-rebuildable-index' into
two-dot-o
Posted by to...@apache.org.
Merge branch 'two-dot-o-rebuildable-index' into two-dot-o
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2b8886e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2b8886e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2b8886e
Branch: refs/heads/esbatching
Commit: b2b8886ef3351a24c2ef5d7785c2d3fd09265353
Parents: 1e7da5c 9a670e3
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 23:28:19 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 23:28:19 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 519 +++++++++++++++----
.../corepersistence/CpEntityManagerFactory.java | 495 ++++++++----------
.../corepersistence/CpRelationManager.java | 356 +++++--------
.../usergrid/corepersistence/CpSetup.java | 55 +-
.../HybridEntityManagerFactory.java | 18 +-
.../persistence/EntityManagerFactory.java | 12 +-
.../persistence/cassandra/CassandraService.java | 5 -
.../cassandra/EntityManagerFactoryImpl.java | 54 +-
.../PerformanceEntityRebuildIndexTest.java | 278 ++++++++++
.../cassandra/EntityManagerFactoryImplIT.java | 10 +-
stack/core/src/test/resources/log4j.properties | 13 +-
.../usergrid/persistence/index/IndexFig.java | 7 +
.../index/impl/EsEntityIndexImpl.java | 18 +
.../persistence/index/impl/EsProvider.java | 15 +
.../org/apache/usergrid/tools/IndexRebuild.java | 63 ++-
stack/tools/src/main/resources/log4j.properties | 18 +-
16 files changed, 1206 insertions(+), 730 deletions(-)
----------------------------------------------------------------------
[05/13] git commit: Merge branch 'two-dot-o' into
two-dot-o-rebuildable-index
Posted by to...@apache.org.
Merge branch 'two-dot-o' into two-dot-o-rebuildable-index
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3df5d4d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3df5d4d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3df5d4d2
Branch: refs/heads/esbatching
Commit: 3df5d4d25b83e39dfd1fbe0919bc68d3d84ed821
Parents: 1aa04a7 abbd76e
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 10:51:03 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 10:51:03 2014 -0400
----------------------------------------------------------------------
portal/js/global/ug-service.js | 10 +-
.../corepersistence/CpEntityManager.java | 3 +-
.../corepersistence/CpRelationManager.java | 6 +-
.../usergrid/mq/cassandra/QueueManagerImpl.java | 18 +-
.../mq/cassandra/io/AbstractSearch.java | 208 +++++++++++--------
.../mq/cassandra/io/ConsumerTransaction.java | 19 +-
.../persistence/PerformanceEntityReadTest.java | 2 +
.../persistence/PerformanceEntityWriteTest.java | 2 +
.../index/impl/EsEntityIndexImpl.java | 52 ++++-
.../persistence/index/impl/EsQueryVistor.java | 45 ++--
.../java/org/apache/usergrid/rest/BasicIT.java | 34 ++-
.../notifications/ApplicationQueueManager.java | 40 ++--
.../notifications/NotificationsService.java | 7 +-
.../notifications/NotificationsTaskManager.java | 33 ---
.../services/notifications/QueueListener.java | 64 +++---
.../services/notifications/QueueManager.java | 4 -
.../notifications/SingleQueueTaskManager.java | 10 +-
.../services/notifications/TaskTracker.java | 4 +-
.../resources/usergrid-services-context.xml | 9 +-
.../apns/NotificationsServiceIT.java | 6 +-
.../gcm/NotificationsServiceIT.java | 9 +-
21 files changed, 342 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3df5d4d2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3df5d4d2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
[06/13] git commit: Oops.
Posted by to...@apache.org.
Oops.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/352d69e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/352d69e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/352d69e0
Branch: refs/heads/esbatching
Commit: 352d69e02ece003acd6dc5f2027cbd15354d3714
Parents: 3df5d4d
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 10:55:54 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 10:55:54 2014 -0400
----------------------------------------------------------------------
.../apache/usergrid/corepersistence/CpEntityManagerFactory.java | 1 +
.../usergrid/corepersistence/HybridEntityManagerFactory.java | 4 ++--
.../org/apache/usergrid/persistence/EntityManagerFactory.java | 2 +-
.../usergrid/persistence/cassandra/EntityManagerFactoryImpl.java | 2 +-
.../src/main/java/org/apache/usergrid/tools/IndexRebuild.java | 3 ++-
5 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/352d69e0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 168c1bf..00a808c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -617,4 +617,5 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po ) {
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/352d69e0/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
index 5897d6c..54a5dee 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
@@ -160,7 +160,7 @@ public class HybridEntityManagerFactory implements EntityManagerFactory, Applica
}
@Override
- public void rebuildCollectionIndex(UUID appId, String collection, Object object) {
- factory.rebuildCollectionIndex(appId, collection, object);
+ public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po) {
+ factory.rebuildCollectionIndex(appId, collection, po);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/352d69e0/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index d962d2a..06c3114 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -128,7 +128,7 @@ public interface EntityManagerFactory {
/** For testing purposes */
public void flushEntityManagerCaches();
- public void rebuildCollectionIndex(UUID appId, String collection, Object object);
+ public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver object);
public interface ProgressObserver {
public void onProgress( EntityRef source, EntityRef target, String edgeType );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/352d69e0/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
index 955e707..399bccd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -435,7 +435,7 @@ public class EntityManagerFactoryImpl implements EntityManagerFactory, Applicati
}
@Override
- public void rebuildCollectionIndex(UUID appId, String collection, Object object) {
+ public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po) {
throw new UnsupportedOperationException("Not supported.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/352d69e0/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
index 66a5cfb..795fa1d 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
@@ -74,7 +74,8 @@ public class IndexRebuild extends ToolBase {
Option collOpt = OptionBuilder.withArgName( COLLECTION_ARG ).hasArg().isRequired( false )
.withDescription( "Collection name" ).create( COLLECTION_ARG );
- Option allOpt = OptionBuilder.withType( Boolean.class ).withArgName( ALL_ARG ).hasArg().isRequired( false )
+ Option allOpt = OptionBuilder.withType( Boolean.class )
+ .withArgName( ALL_ARG ).hasArg().isRequired( false )
.withDescription( "True to reindex all application" ).create( ALL_ARG );
Options options = new Options();
[10/13] git commit: Improve IndexRebuild test to test connections.
Posted by to...@apache.org.
Improve IndexRebuild test to test connections.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9a670e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9a670e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9a670e3d
Branch: refs/heads/esbatching
Commit: 9a670e3d28781acc8554030229b928b8e6b47f2c
Parents: a644034
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 20:55:36 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 20:55:36 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 7 ++-
.../corepersistence/CpRelationManager.java | 4 +-
.../PerformanceEntityRebuildIndexTest.java | 63 ++++++++++++++++----
3 files changed, 58 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a670e3d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index b3d5135..accf2f8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -326,7 +326,12 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
return null;
}
- return results.iterator().next().getUuid();
+ Entity entity = results.iterator().next();
+ Object uuidObject = entity.getProperty("applicationUuid");
+ if ( uuidObject instanceof UUID ) {
+ return (UUID)uuidObject;
+ }
+ return UUIDUtils.tryExtractUUID( entity.getProperty("applicationUuid").toString() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a670e3d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index be21e85..fba758f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -642,7 +642,7 @@ public class CpRelationManager implements RelationManager {
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge(edge).toBlockingObservable().last();
- logger.debug("\n\nWrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}\n\n", new Object[] {
+ logger.debug("Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] {
edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
memberEntity.getId().getType(), memberEntity.getId().getUuid(),
applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});
@@ -998,7 +998,7 @@ public class CpRelationManager implements RelationManager {
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge(edge).toBlockingObservable().last();
- logger.debug("\n\nWrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}\n\n", new Object[] {
+ logger.debug("Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] {
edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
targetEntity.getId().getType(), targetEntity.getId().getUuid(),
applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a670e3d/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 0f033a2..acfb1e0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -65,7 +65,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
- private static final long writeDelayMs = 15;
+ private static final long writeDelayMs = 100;
//private static final long readDelayMs = 7;
@Rule
@@ -103,29 +103,58 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- create a bunch of entities
final long stopTime = System.currentTimeMillis() + RUNTIME;
- final Map<String, Object> entityMap = new HashMap<>();
- entityMap.put( "key1", 1000 );
- entityMap.put( "key2", 2000 );
- entityMap.put( "key3", "Some value" );
+ Map<String, Object> entityMap = new HashMap<String, Object>() {{
+ put("key1", 1000 );
+ put("key2", 2000 );
+ put("key3", "Some value");
+ }};
+ Map<String, Object> cat1map = new HashMap<String, Object>() {{
+ put("name", "enzo");
+ put("color", "orange");
+ }};
+ Map<String, Object> cat2map = new HashMap<String, Object>() {{
+ put("name", "marquee");
+ put("color", "grey");
+ }};
+ Map<String, Object> cat3map = new HashMap<String, Object>() {{
+ put("name", "bertha");
+ put("color", "tabby");
+ }};
+
+ Entity cat1 = em.create("cat", cat1map );
+ Entity cat2 = em.create("cat", cat2map );
+ Entity cat3 = em.create("cat", cat3map );
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
int entityCount = 0;
while ( System.currentTimeMillis() < stopTime ) {
- entityMap.put("key", entityCount );
- final Entity created;
+
+ final Entity entity;
+
try {
- created = em.create("testType", entityMap );
+ entityMap.put("key", entityCount );
+ entity = em.create("testType", entityMap );
+
+ em.refreshIndex();
+
+ em.createConnection(entity, "herds", cat1);
+ em.createConnection(entity, "herds", cat2);
+ em.createConnection(entity, "herds", cat3);
+
} catch (Exception ex) {
throw new RuntimeException("Error creating entity", ex);
}
- entityRefs.add( new SimpleEntityRef( created.getType(), created.getUuid() ) );
+
+ entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
if ( entityCount % 100 == 0 ) {
logger.info("Created {} entities", entityCount );
}
+
entityCount++;
try { Thread.sleep( writeDelayMs ); } catch (InterruptedException ignored ) {}
}
+
logger.info("Created {} entities", entityCount);
em.refreshIndex();
@@ -218,23 +247,31 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
while ( true ) {
for ( Entity e : results.getEntities() ) {
+
assertEquals( 2000, e.getProperty("key2"));
- //if ( count % 100 == 0 ) {
- logger.info( "read {} entities", count++);
- //}
+
+ Results catResults = em.searchConnectedEntities(e, Query.fromQL("select *"));
+ assertEquals( 3, catResults.size() );
+
+ if ( count % 100 == 0 ) {
+ logger.info( "read {} entities", count);
+ }
+ count++;
}
if ( results.hasCursor() ) {
logger.info( "Counted {} : query again with cursor", count);
q.setCursor( results.getCursor() );
results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+
} else {
break;
}
}
if ( expected != -1 && expected != count ) {
- throw new RuntimeException("Did not get expected " + expected + " entities");
+ throw new RuntimeException("Did not get expected "
+ + expected + " entities, instead got " + count );
}
return count;
}
[12/13] git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into esbatching
Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into esbatching
Conflicts:
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae9974d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae9974d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae9974d3
Branch: refs/heads/esbatching
Commit: ae9974d38210cdef353820d40d8a59d0f0f4140b
Parents: bba08dd b2b8886
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 23:42:59 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 23:42:59 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 528 +++++++++++++++----
.../corepersistence/CpEntityManagerFactory.java | 499 +++++++-----------
.../corepersistence/CpRelationManager.java | 341 +++++-------
.../usergrid/corepersistence/CpSetup.java | 55 +-
.../HybridEntityManagerFactory.java | 18 +-
.../persistence/EntityManagerFactory.java | 12 +-
.../persistence/cassandra/CassandraService.java | 5 -
.../cassandra/EntityManagerFactoryImpl.java | 54 +-
.../PerformanceEntityRebuildIndexTest.java | 278 ++++++++++
.../cassandra/EntityManagerFactoryImplIT.java | 10 +-
stack/core/src/test/resources/log4j.properties | 13 +-
.../usergrid/persistence/index/IndexFig.java | 7 +
.../index/impl/EsEntityIndexImpl.java | 18 +
.../persistence/index/impl/EsProvider.java | 15 +
.../org/apache/usergrid/tools/IndexRebuild.java | 63 ++-
stack/tools/src/main/resources/log4j.properties | 18 +-
16 files changed, 1202 insertions(+), 732 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae9974d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index a5475e7,38453e1..5ce871b
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@@ -73,8 -121,12 +121,13 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+ import org.apache.usergrid.persistence.graph.Edge;
+ import org.apache.usergrid.persistence.graph.GraphManager;
+ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.index.query.CounterResolution;
@@@ -586,24 -644,20 +650,24 @@@ public class CpEntityManager implement
}
}
+
+
// deindex from default index scope
IndexScope defaultIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
getCollectionScopeNameFromEntityType( entityRef.getType() ) );
- EntityIndex entityIndex = managerCache.getEntityIndex( defaultIndexScope );
- entityIndex.deindex( entity );
+
+ batch.deindex(defaultIndexScope, entity );
IndexScope allTypesIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
ALL_TYPES);
- EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope );
- aei.deindex( entity );
+
+ batch.deindex( allTypesIndexScope, entity );
+
+ batch.execute();
decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae9974d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 090a4d6,accf2f8..9a10afd
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@@ -47,23 -46,18 +46,19 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
- import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
+ import org.apache.usergrid.persistence.graph.Edge;
+ import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.index.EntityIndex;
- import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexScope;
- import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
- import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
- import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
- import org.apache.usergrid.persistence.model.field.Field;
- import org.apache.usergrid.persistence.model.field.LongField;
- import org.apache.usergrid.persistence.model.field.StringField;
- import org.apache.usergrid.persistence.model.field.UUIDField;
+ import org.apache.usergrid.utils.UUIDUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -321,11 -299,11 +300,11 @@@ public class CpEntityManagerFactory imp
public UUID lookupOrganization( String name) throws Exception {
+ init();
Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'");
-
- EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
- CandidateResults results = ei.search(SYSTEM_ORGS_INDEX_SCOPE, q );
- EntityManager em = getEntityManager(SYSTEM_APP_ID);
- Results results = em.searchCollection( em.getApplicationRef(), "organizations", q);
++ EntityManager em = getEntityManager( SYSTEM_APP_ID );
++ Results results = em.searchCollection( em.getApplicationRef(), "organizations", q );
if ( results.isEmpty() ) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae9974d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 260e429,fba758f..0073eb1
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@@ -27,13 -29,13 +29,12 @@@ import java.util.List
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import me.prettyprint.hector.api.mutation.Mutator;
-import static org.apache.usergrid.corepersistence.CpEntityManager.getEdgeTypeFromCollectionName;
-import static org.apache.usergrid.corepersistence.CpEntityManager.getEdgeTypeFromConnectionType;
+
- import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.Entity;
@@@ -1108,25 -998,13 +1009,25 @@@ public class CpRelationManager implemen
GraphManager gm = managerCache.getGraphManager(applicationScope);
gm.writeEdge(edge).toBlockingObservable().last();
- final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
- final EntityIndexBatch batch = ei.createBatch();
-
- logger.debug("Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] {
- edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
- targetEntity.getId().getType(), targetEntity.getId().getUuid(),
- applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});
+ // Index the new connection in app|source|type context
+ IndexScope indexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ cpHeadEntity.getId(),
+ CpEntityManager.getConnectionScopeName( connectedEntityRef.getType(), connectionType ));
- batch.index( indexScope, targetEntity );
++ EntityIndex ei = managerCache.getEntityIndex(indexScope);
++ ei.index( targetEntity );
+
+ // Index the new connection in app|scope|all-types context
+ IndexScope allTypesIndexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ cpHeadEntity.getId(),
+ ALL_TYPES);
++ EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
++ aei.index( targetEntity );
- ((CpEntityManager)em).indexEntityIntoConnection(
- cpHeadEntity, targetEntity, connectedEntityRef.getType(), connectionType );
+ batch.index( allTypesIndexScope, targetEntity );
+
+ batch.execute();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = createMutator( ko, be );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae9974d3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 56316d8,2eb8414..7851afe
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@@ -281,18 -674,32 +281,36 @@@ public class EsEntityIndexImpl implemen
}
- @Override
+ public void refresh() {
+ client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+ log.debug("Refreshed index: " + indexName);
+ }
+
-
@Override
- public CandidateResults getEntityVersions(Id id) {
+ public CandidateResults getEntityVersions( final IndexScope scope, final Id id ) {
Query query = new Query();
- query.addEqualityFilter(ENTITYID_FIELDNAME,id.getUuid().toString());
- CandidateResults results = search( query );
+ query.addEqualityFilter( ENTITYID_FIELDNAME, id.getUuid().toString() );
+ CandidateResults results = search( scope, query );
return results;
}
+ /**
+ * For testing only.
+ */
+ public void deleteIndex() {
+ AdminClient adminClient = client.admin();
+ DeleteIndexResponse response = adminClient.indices().prepareDelete( indexName ).get();
+ if ( response.isAcknowledged() ) {
+ log.info("Deleted index: " + indexName );
+ } else {
+ log.info("Failed to delete index " + indexName );
+ }
+ }
+
+
+ @Override
+ public void refresh() {
+ client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+ log.debug( "Refreshed index: " + indexName );
+ }
}
[09/13] git commit: Merge branch 'two-dot-o' into
two-dot-o-rebuildable-index
Posted by to...@apache.org.
Merge branch 'two-dot-o' into two-dot-o-rebuildable-index
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a644034c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a644034c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a644034c
Branch: refs/heads/esbatching
Commit: a644034c7e7731d10689b2ac925ee931daa5da9e
Parents: 3ec144f 34a2a6a
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 1 16:00:31 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 1 16:00:31 2014 -0400
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 12 +-
.../collection/event/EntityDeleted.java | 25 +
.../collection/event/EntityVersionCreated.java | 25 +
.../collection/event/EntityVersionDeleted.java | 29 +
.../collection/guice/CollectionModule.java | 18 +-
.../guice/CollectionTaskExecutor.java | 35 +
.../impl/EntityCollectionManagerImpl.java | 14 +
.../impl/EntityVersionCleanupTask.java | 198 ++++++
.../serialization/SerializationFig.java | 32 +-
.../serialization/impl/LogEntryIterator.java | 114 +++
.../EntityCollectionManagerStressTest.java | 2 +
.../impl/EntityVersionCleanupTaskTest.java | 690 +++++++++++++++++++
.../impl/LogEntryIteratorTest.java | 131 ++++
.../collection/util/LogEntryMock.java | 152 ++++
.../core/astyanax/AstyanaxKeyspaceProvider.java | 2 +
.../persistence/core/guice/CommonModule.java | 2 +
.../core/task/NamedTaskExecutorImpl.java | 167 +++++
.../usergrid/persistence/core/task/Task.java | 31 +
.../persistence/core/task/TaskExecutor.java | 23 +
.../core/task/NamedTaskExecutorImplTest.java | 227 ++++++
.../usergrid/persistence/graph/GraphFig.java | 2 +
.../persistence/graph/event/EdgeDeleted.java | 8 +
.../persistence/graph/guice/GraphModule.java | 16 +
.../graph/guice/GraphTaskExecutor.java | 33 +
.../impl/shard/impl/NodeShardCacheImpl.java | 5 -
.../shard/impl/ShardGroupCompactionImpl.java | 175 +++--
.../graph/GraphManagerStressTest.java | 1 +
.../impl/shard/ShardGroupCompactionTest.java | 5 +-
.../notifications/ApplicationQueueManager.java | 23 +-
.../notifications/SingleQueueTaskManager.java | 177 -----
.../services/notifications/TaskManager.java | 179 +++++
.../services/notifications/TaskTracker.java | 4 +-
.../apns/NotificationsServiceIT.java | 11 +-
.../gcm/NotificationsServiceIT.java | 8 +-
34 files changed, 2288 insertions(+), 288 deletions(-)
----------------------------------------------------------------------