You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/31 03:03:27 UTC
[3/3] git commit: working out multiget
working out multiget
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/94a92c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/94a92c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/94a92c39
Branch: refs/heads/datastax-driver
Commit: 94a92c394a5bfb8e04545076a13e2921c090236c
Parents: 6de7cbc
Author: Todd Nine <to...@apache.org>
Authored: Sat Aug 30 14:36:36 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Aug 30 19:03:06 2014 -0600
----------------------------------------------------------------------
.../collection/guice/MigrationManagerRule.java | 11 +-
.../core/astyanax/AstyanaxKeyspaceProvider.java | 1 +
.../persistence/core/astyanax/CassandraFig.java | 6 +-
.../persistence/core/guice/CommonModule.java | 15 +-
.../ArchaiusHystrixPoolConfiguration.java | 86 ---
.../core/javadriver/BatchStatementUtils.java | 72 +++
.../javadriver/DatastaxSessionProvider.java | 68 +--
.../javadriver/GuicyFigPoolConfiguration.java | 106 ++++
.../core/javadriver/RowIterator.java | 98 ++++
.../persistence/core/javadriver/RowParser.java | 40 ++
.../core/migration/DatastaxMigration.java | 39 ++
.../migration/DatastaxMigrationManager.java | 36 ++
.../migration/DatastaxMigrationManagerImpl.java | 127 +++++
.../cass-templates/cassandra-template-ug.yaml | 250 ++++-----
.../persistence/core/astyanax/TestUtils.java | 2 +-
.../core/javadriver/HystrixSessionTest.java | 2 +-
.../persistence/graph/guice/GraphModule.java | 11 +-
.../graph/impl/GraphManagerImpl.java | 34 +-
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 17 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 64 ++-
.../impl/stage/NodeDeleteListenerImpl.java | 23 +-
.../EdgeMetadataSerialization.java | 23 +-
.../graph/serialization/EdgeSerialization.java | 6 +-
.../graph/serialization/NodeSerialization.java | 8 +-
.../impl/EdgeMetadataSerializationImpl.java | 531 ++++++++++---------
.../impl/EdgeSerializationImpl.java | 9 +-
.../impl/NodeSerializationImpl.java | 221 +++++---
.../impl/shard/ShardEntryGroup.java | 4 +-
.../persistence/graph/GraphManagerIT.java | 25 +-
.../graph/impl/EdgeDeleteListenerTest.java | 22 +-
.../graph/impl/NodeDeleteListenerTest.java | 11 +-
.../graph/impl/stage/EdgeDeleteRepairTest.java | 10 +-
.../graph/impl/stage/EdgeMetaRepairTest.java | 80 +--
.../EdgeMetadataSerializationTest.java | 285 +++++-----
.../EdgeSerializationChopTest.java | 7 +-
.../serialization/EdgeSerializationTest.java | 179 +++----
.../serialization/NodeSerializationTest.java | 27 +-
.../graph/test/util/EdgeTestUtils.java | 2 +-
stack/corepersistence/pom.xml | 5 +-
39 files changed, 1563 insertions(+), 1000 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
index 2d92adb..27c0b43 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
@@ -5,6 +5,8 @@ import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigration;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigrationManager;
import org.apache.usergrid.persistence.core.migration.MigrationException;
import org.apache.usergrid.persistence.core.migration.MigrationManager;
@@ -18,12 +20,13 @@ import com.google.inject.Singleton;
public class MigrationManagerRule extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger( MigrationManagerRule.class );
- private MigrationManager migrationManager;
+
+ private DatastaxMigrationManager datastaxMigration;
@Inject
- public void setMigrationManager( MigrationManager migrationManager ) {
- this.migrationManager = migrationManager;
+ public void setDsMigrationManager(final DatastaxMigrationManager datastaxMigration){
+ this.datastaxMigration = datastaxMigration;
}
@@ -31,7 +34,7 @@ public class MigrationManagerRule extends ExternalResource {
protected void before() throws MigrationException {
LOG.info( "Starting migration" );
- migrationManager.migrate();
+ datastaxMigration.migrate();
LOG.info( "Migration complete" );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..fff471d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -59,6 +59,7 @@ public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
AstyanaxConfiguration config = new AstyanaxConfigurationImpl()
.setDiscoveryType( NodeDiscoveryType.valueOf( cassandraFig.getDiscoveryType() ) )
.setTargetCassandraVersion( cassandraFig.getVersion() )
+ .setCqlVersion( "3.0.0" )
.setDefaultReadConsistencyLevel( cassandraConfig.getReadCL() )
.setDefaultWriteConsistencyLevel( cassandraConfig.getWriteCL() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 6e18a9a..630dece 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -41,7 +41,7 @@ public interface CassandraFig extends GuicyFig {
String getHosts();
@Key( "cassandra.version" )
- @Default( "1.2" )
+ @Default( "2.1" )
String getVersion();
@Key( "cassandra.cluster_name" )
@@ -76,6 +76,10 @@ public interface CassandraFig extends GuicyFig {
@Default( "false" )
boolean isEmbedded();
+ @Key("cassandra.concurrent.invocations")
+ @Default( "100" )
+ int getConcurrentInvocations();
+
@Default("CL_QUORUM")
@Key(READ_CL)
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..849dc52 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -27,10 +27,15 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.javadriver.DatastaxSessionProvider;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigration;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigrationManager;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigrationManagerImpl;
import org.apache.usergrid.persistence.core.migration.MigrationManager;
import org.apache.usergrid.persistence.core.migration.MigrationManagerFig;
import org.apache.usergrid.persistence.core.migration.MigrationManagerImpl;
+import com.datastax.driver.core.Session;
import com.google.inject.AbstractModule;
import com.netflix.astyanax.Keyspace;
@@ -52,13 +57,21 @@ public class CommonModule extends AbstractModule {
// bind our keyspace to the AstyanaxKeyspaceProvider
bind( Keyspace.class ).toProvider( AstyanaxKeyspaceProvider.class ).asEagerSingleton();
+
+ bind( Session.class).toProvider( DatastaxSessionProvider.class ).asEagerSingleton();
+
+
+ //bind our datastax migration manager
+ bind( DatastaxMigrationManager.class ).to( DatastaxMigrationManagerImpl.class ).asEagerSingleton();
+
// bind our migration manager
- bind( MigrationManager.class ).to( MigrationManagerImpl.class );
+// bind( MigrationManager.class ).to( MigrationManagerImpl.class );
bind( TimeService.class ).to( TimeServiceImpl.class );
bind( CassandraConfig.class ).to( CassandraConfigImpl.class );
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java
deleted file mode 100644
index 441fabc..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-package org.apache.usergrid.persistence.core.javadriver;
-
-
-import java.util.Collection;
-
-
-/**
- *
- */
-public class ArchaiusHystrixPoolConfiguration implements HystrixPoolConfiguration {
-
- //TODO implement this
-
- @Override
- public String getServiceName() {
- return null;
- }
-
-
- @Override
- public int getPort() {
- return 0;
- }
-
-
- @Override
- public void addPortLister( final PropertyChangeListener<Integer> listener ) {
-
- }
-
-
- @Override
- public Collection<String> getSeedNodes() {
- return null;
- }
-
-
- @Override
- public void addSeedNodesLister( final PropertyChangeListener<Collection<String>> listener ) {
-
- }
-
-
- @Override
- public int getMaxConnectionsPerHost() {
- return 0;
- }
-
-
- @Override
- public void addMaxConnectionsLister( final PropertyChangeListener<Integer> listener ) {
-
- }
-
-
- @Override
- public int getDefaultConnectionsPerIdentity() {
- return 0;
- }
-
-
- @Override
- public void addDefaultConnectionsPerIdentity( final PropertyChangeListener<Integer> listener ) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java
new file mode 100644
index 0000000..feedd6e
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import java.util.Collection;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+
+/**
+ * Utility class for rolling up multiple statements as a batch
+ *
+ */
+public class BatchStatementUtils {
+
+
+ /**
+ * Roll up the statements into batches (non atomic) and execute them
+ * @param session
+ * @param statements
+ */
+ public static ResultSet runBatches( final Session session,
+ Collection<? extends Statement>... statements ){
+
+
+ final BatchStatement batch = fastStatement();
+
+ for(Collection<? extends Statement> statement: statements){
+ batch.addAll( statement );
+ }
+
+
+ return session.execute( batch );
+ }
+
+
+ /**
+ * We're running on an eventually consistent system, don't use logged statement (paxos & atomic) unless you
+ * absolutely must!
+ *
+ * @return
+ */
+ public static BatchStatement fastStatement(){
+ return new BatchStatement( BatchStatement.Type.UNLOGGED );
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
index 776793d..168d781 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
@@ -21,20 +21,12 @@
package org.apache.usergrid.persistence.core.javadriver;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import com.datastax.driver.core.Session;
import com.google.inject.Inject;
import com.google.inject.Provider;
-import com.netflix.astyanax.AstyanaxConfiguration;
-import com.netflix.astyanax.AstyanaxContext;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
-import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
-import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
-import com.netflix.astyanax.connectionpool.impl.Slf4jConnectionPoolMonitorImpl;
-import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
-import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import com.google.inject.Singleton;
/**
@@ -43,56 +35,24 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
*
* @author tnine
*/
-public class DatastaxSessionProvider implements Provider<Keyspace> {
- private final CassandraFig cassandraFig;
- private final CassandraConfig cassandraConfig;
+@Singleton
+public class DatastaxSessionProvider implements Provider<Session> {
+ private final HystrixPool pool;
+ private final Session session;
- @Inject
- public DatastaxSessionProvider( final CassandraFig cassandraFig, final CassandraConfig cassandraConfig ) {
- this.cassandraFig = cassandraFig;
- this.cassandraConfig = cassandraConfig;
- }
-
-
- @Override
- public Keyspace get() {
-
- AstyanaxConfiguration config = new AstyanaxConfigurationImpl()
- .setDiscoveryType( NodeDiscoveryType.valueOf( cassandraFig.getDiscoveryType() ) )
- .setTargetCassandraVersion( cassandraFig.getVersion() )
- .setDefaultReadConsistencyLevel( cassandraConfig.getReadCL() )
- .setDefaultWriteConsistencyLevel( cassandraConfig.getWriteCL() );
-
- ConnectionPoolConfiguration connectionPoolConfiguration =
- new ConnectionPoolConfigurationImpl( "UsergridConnectionPool" )
- .setPort( cassandraFig.getThriftPort() )
- .setMaxConnsPerHost( cassandraFig.getConnections() )
- .setSeeds( cassandraFig.getHosts() )
- .setSocketTimeout( cassandraFig.getTimeout() );
- AstyanaxContext<Keyspace> context =
- new AstyanaxContext.Builder().forCluster( cassandraFig.getClusterName() )
- .forKeyspace( cassandraFig.getKeyspaceName() )
-
- /*
- * TODO tnine Filter this by adding a host supplier. We will get token discovery from cassandra
- * but only connect
- * to nodes that have been specified. Good for real time updates of the cass system without
- * adding
- * load to them during runtime
- */
-
- .withAstyanaxConfiguration( config )
- .withConnectionPoolConfiguration( connectionPoolConfiguration )
- .withConnectionPoolMonitor( new Slf4jConnectionPoolMonitorImpl() )
- .buildKeyspace( ThriftFamilyFactory.getInstance() );
-
- context.start();
+ @Inject
+ public DatastaxSessionProvider( final CassandraFig cassandraFig ) {
+ pool = new HystrixPoolImpl( new GuicyFigPoolConfiguration( cassandraFig ) );
+ session = pool.getSession( "usergrid" );
- return context.getClient();
}
+ @Override
+ public Session get() {
+ return session;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java
new file mode 100644
index 0000000..1bd003f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ *
+ */
+@Singleton
+public class GuicyFigPoolConfiguration implements HystrixPoolConfiguration {
+
+ private final CassandraFig cassandraFig;
+
+
+ @Inject
+ public GuicyFigPoolConfiguration( final CassandraFig cassandraFig ) {this.cassandraFig = cassandraFig;}
+
+
+ @Override
+ public String getServiceName() {
+ return "Usergrid";
+ }
+
+
+ @Override
+ public int getPort() {
+ return cassandraFig.getNativePort();
+ }
+
+
+ @Override
+ public void addPortLister( final PropertyChangeListener<Integer> listener ) {
+
+ }
+
+
+ @Override
+ public Collection<String> getSeedNodes() {
+
+ String hosts = cassandraFig.getHosts();
+
+ String[] splitHosts = hosts.split( "," );
+
+ return Arrays.asList( splitHosts );
+ }
+
+
+ @Override
+ public void addSeedNodesLister( final PropertyChangeListener<Collection<String>> listener ) {
+
+ }
+
+
+ @Override
+ public int getMaxConnectionsPerHost() {
+ return this.cassandraFig.getConnections();
+ }
+
+
+ @Override
+ public void addMaxConnectionsLister( final PropertyChangeListener<Integer> listener ) {
+
+ }
+
+
+ @Override
+ public int getDefaultConnectionsPerIdentity() {
+ /**
+ * This is arbitrary and SUPER high. TODO T.N. Make this come from a
+ */
+ return this.cassandraFig.getConcurrentInvocations();
+ }
+
+
+ @Override
+ public void addDefaultConnectionsPerIdentity( final PropertyChangeListener<Integer> listener ) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java
new file mode 100644
index 0000000..f47ffa3
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.query.RowQuery;
+
+
+/**
+ * Simple iterator that wraps a Row query and will keep executing it's paging until there are no more results to read
+ * from cassandra
+ */
+public class RowIterator<T> implements Iterable<T>, Iterator<T> {
+
+
+ private final RowParser<T> parser;
+ private final Session session;
+ private final Statement statement;
+
+ private Iterator<Row> iterator;
+
+
+
+ public RowIterator( final Session session, final Statement statement, final RowParser<T> parser ) {
+ this.session = session;
+ this.statement = statement;
+ this.parser = parser;
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return this;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+
+ if ( iterator == null ) {
+ startIterator();
+ }
+
+ return iterator.hasNext();
+ }
+
+
+ @Override
+ public T next() {
+
+ if ( !hasNext() ) {
+ throw new NoSuchElementException();
+ }
+
+ return parser.parseRow( iterator.next());
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Execute the query again and set the results
+ */
+ private void startIterator() {
+
+ iterator = session.execute( statement ).iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java
new file mode 100644
index 0000000..a620244
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import com.datastax.driver.core.Row;
+import com.netflix.astyanax.model.Column;
+
+
+/**
+ * Column parser to be used in iterators
+ */
+public interface RowParser<T> {
+
+ /**
+ * Parse the row and return the object
+ * @param row The row to parse
+ * @return
+ */
+ public T parseRow( Row row );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java
new file mode 100644
index 0000000..928278b
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration;
+
+
+import java.util.Collection;
+
+
+
+public interface DatastaxMigration {
+
+ /**
+ * Get the column families required for this implementation. If one does not exist it will be created.
+ */
+ public Collection<String> getColumnFamilies();
+
+ /**
+ * Create all prepared statements required for the implementation
+ */
+ public void prepareStatements();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java
new file mode 100644
index 0000000..a2ef364
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration;
+
+
+/**
+ * A manager that will perform any migrations necessary. Setup code should invoke the implementation of this interface
+ *
+ * @author tnine
+ */
+public interface DatastaxMigrationManager {
+
+ /**
+ * Perform any migration necessary in the application. Will only create keyspaces and column families if they do
+ * not exist
+ */
+ public void migrate() throws MigrationException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java
new file mode 100644
index 0000000..3a3e2db
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration;
+
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Implementation of the migration manager to set up keyspace
+ *
+ * @author tnine
+ */
+@Singleton
+public class DatastaxMigrationManagerImpl implements DatastaxMigrationManager {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( DatastaxMigrationManagerImpl.class );
+
+ private final Set<DatastaxMigration> migrations;
+ private final Session session;
+
+ private final CassandraFig cassandraFig;
+
+
+ @Inject
+ public DatastaxMigrationManagerImpl( final Session session, final CassandraFig cassandraFig,
+ final Set<DatastaxMigration> migrations ) {
+ this.session = session;
+ this.migrations = migrations;
+ this.cassandraFig = cassandraFig;
+ }
+
+
+ @Override
+ public void migrate() throws MigrationException {
+
+
+ testAndCreateKeyspace();
+
+ for ( DatastaxMigration migration : migrations ) {
+
+ final Collection<String> columnFamilies = migration.getColumnFamilies();
+
+
+ if ( columnFamilies == null || columnFamilies.size() == 0 ) {
+ logger.warn( "Class {} implements {} but returns null column families for migration. Either implement"
+ + " this method or remove the interface from the class", migration.getClass(),
+ Migration.class );
+ continue;
+ }
+
+ for ( String cf : columnFamilies ) {
+ testAndCreateColumnFamilyDef( cf );
+ }
+
+ /**
+ * Now that all the column families have been created intilaize the prepared statements
+ */
+ migration.prepareStatements();
+ }
+ }
+
+
+ /**
+ * Check if the column family exists. If it dosn't create it
+ */
+ private void testAndCreateColumnFamilyDef( String columnFamilyExecute ) throws MigrationException {
+
+ try {
+ session.execute( columnFamilyExecute );
+ }
+ catch ( Throwable t ) {
+ final String message =
+ String.format( "Unable to create column family. Input string was \"%s\" ", columnFamilyExecute );
+ logger.error( message, t );
+ throw new MigrationException( message, t );
+ }
+ }
+
+
+ /**
+ * Check if they keyspace exists. If it doesn't create it
+ */
+ private void testAndCreateKeyspace() {
+
+
+ final String keyspaceName = cassandraFig.getKeyspaceName();
+
+
+ final String createStatement = "CREATE KEYSPACE IF NOT EXISTS " + keyspaceName
+ + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
+ session.execute( createStatement );
+
+ //set our keyspace
+ session.execute( "USE " + keyspaceName );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml b/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
index 5e0e7d6..03e27db 100644
--- a/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
+++ b/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
@@ -9,7 +9,6 @@
# one logical cluster from joining another.
cluster_name: '$CLUSTER$'
-
# This defines the number of tokens randomly assigned to this node on the ring
# The more tokens, relative to other nodes, the larger the proportion of data
# that this node will store. You probably want all nodes to have the same number
@@ -20,23 +19,19 @@ cluster_name: '$CLUSTER$'
#
# Specifying initial_token will override this setting.
#
-# If you already have a cluster with 1 token per node, and wish to migrate to
+# If you already have a cluster with 1 token per node, and wish to migrate to
# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
-# num_tokens: 256
+num_tokens: 256
-# If you haven't specified num_tokens, or have set it to the default of 1 then
-# you should always specify InitialToken when setting up a production
-# cluster for the first time, and often when adding capacity later.
-# The principle is that each node should be given an equal slice of
-# the token ring; see http://wiki.apache.org/cassandra/Operations
-# for more details.
-#
-# If blank, Cassandra will request a token bisecting the range of
-# the heaviest-loaded existing node. If there is no load information
-# available, such as is the case with a new cluster, it will pick
-# a random token, which will lead to hot spots.
-initial_token:
+# initial_token allows you to specify tokens manually. While you can use # it with
+# vnodes (num_tokens > 1, above) -- in which case you should provide a
+# comma-separated list -- it's primarily used when adding nodes # to legacy clusters
+# that do not have vnodes enabled.
+# initial_token:
+# May either be "true" or "false" to enable globally, or contain a list
+# of data centers to enable per-datacenter.
+# hinted_handoff_enabled: DC1,DC2
# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
@@ -58,11 +53,6 @@ max_hints_delivery_threads: 2
# reduced proportionally to the number of nodes in the cluster.
batchlog_replay_throttle_in_kb: 1024
-# The following setting populates the page cache on memtable flush and compaction
-# WARNING: Enable this setting only when the whole node's data fits in memory.
-# Defaults to: false
-# populate_io_cache_on_flush: false
-
# Authentication backend, implementing IAuthenticator; used to identify users
# Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthenticator,
# PasswordAuthenticator}.
@@ -88,27 +78,16 @@ authorizer: AllowAllAuthorizer
# Will be disabled automatically for AllowAllAuthorizer.
permissions_validity_in_ms: 2000
-# The partitioner is responsible for distributing rows (by key) across
-# nodes in the cluster. Any IPartitioner may be used, including your
-# own as long as it is on the classpath. Out of the box, Cassandra
-# provides org.apache.cassandra.dht.{Murmur3Partitioner, RandomPartitioner
-# ByteOrderedPartitioner, OrderPreservingPartitioner (deprecated)}.
-#
-# - RandomPartitioner distributes rows across the cluster evenly by md5.
-# This is the default prior to 1.2 and is retained for compatibility.
-# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128
-# Hash Function instead of md5. When in doubt, this is the best option.
-# - ByteOrderedPartitioner orders rows lexically by key bytes. BOP allows
-# scanning rows in key order, but the ordering can generate hot spots
-# for sequential insertion workloads.
-# - OrderPreservingPartitioner is an obsolete form of BOP, that stores
-# - keys in a less-efficient format and only works with keys that are
-# UTF8-encoded Strings.
-# - CollatingOPP collates according to EN,US rules rather than lexical byte
-# ordering. Use this as an example if you need custom collation.
-#
-# See http://wiki.apache.org/cassandra/Operations for more on
-# partitioners and token selection.
+# The partitioner is responsible for distributing groups of rows (by
+# partition key) across nodes in the cluster. You should leave this
+# alone for new clusters. The partitioner can NOT be changed without
+# reloading all data, so when upgrading you should set this to the
+# same partitioner you were already using.
+#
+# Besides Murmur3Partitioner, partitioners included for backwards
+# compatibility include RandomPartitioner, ByteOrderedPartitioner, and
+# OrderPreservingPartitioner.
+#
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# Directories where Cassandra should store data on disk. Cassandra
@@ -121,6 +100,7 @@ data_file_directories:
commitlog_directory: $DIR$/commitlog
# policy for data disk failures:
+# stop_paranoid: shut down gossip and Thrift even for single-sstable errors.
# stop: shut down gossip and Thrift, leaving the node effectively dead, but
# can still be inspected via JMX.
# best_effort: stop using the failed disk and respond to requests based on
@@ -129,6 +109,14 @@ commitlog_directory: $DIR$/commitlog
# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
disk_failure_policy: stop
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+# can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but
+# continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
# Maximum size of the key cache in memory.
#
# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
@@ -179,28 +167,23 @@ row_cache_save_period: 0
# Disabled by default, meaning all keys are going to be saved
# row_cache_keys_to_save: 100
-# The provider for the row cache to use.
-#
-# Supported values are: ConcurrentLinkedHashCacheProvider, SerializingCacheProvider
+# The off-heap memory allocator. Affects storage engine metadata as
+# well as caches. Experiments show that JEMAlloc saves some memory
+# than the native GCC allocator (i.e., JEMalloc is more
+# fragmentation-resistant).
+#
+# Supported values are: NativeAllocator, JEMallocAllocator
#
-# SerializingCacheProvider serialises the contents of the row and stores
-# it in native memory, i.e., off the JVM Heap. Serialized rows take
-# significantly less memory than "live" rows in the JVM, so you can cache
-# more rows in a given memory footprint. And storing the cache off-heap
-# means you can use smaller heap sizes, reducing the impact of GC pauses.
-# Note however that when a row is requested from the row cache, it must be
-# deserialized into the heap for use.
+# If you intend to use JEMallocAllocator you have to install JEMalloc as library and
+# modify cassandra-env.sh as directed in the file.
#
-# It is also valid to specify the fully-qualified class name to a class
-# that implements org.apache.cassandra.cache.IRowCacheProvider.
-#
-# Defaults to SerializingCacheProvider
-row_cache_provider: SerializingCacheProvider
+# Defaults to NativeAllocator
+# memory_allocator: NativeAllocator
# saved caches
saved_caches_directory: $DIR$/saved_caches
-# commitlog_sync may be either "periodic" or "batch."
+# commitlog_sync may be either "periodic" or "batch."
# When in batch mode, Cassandra won't ack writes until the commit log
# has been fsynced to disk. It will wait up to
# commitlog_sync_batch_window_in_ms milliseconds for other writes, before
@@ -222,7 +205,7 @@ commitlog_sync_period_in_ms: 10000
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
# in it (potentially from each columnfamily in the system) has been
-# flushed to sstables.
+# flushed to sstables.
#
# The default size is 32, which is almost always fine, but if you are
# archiving commitlog segments (see commitlog_archiving.properties),
@@ -233,7 +216,7 @@ commitlog_segment_size_in_mb: 32
# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
- # Addresses of hosts that are deemed contact points.
+ # Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
# multiple nodes!
@@ -243,31 +226,6 @@ seed_provider:
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "127.0.0.1"
-# emergency pressure valve: each time heap usage after a full (CMS)
-# garbage collection is above this fraction of the max, Cassandra will
-# flush the largest memtables.
-#
-# Set to 1.0 to disable. Setting this lower than
-# CMSInitiatingOccupancyFraction is not likely to be useful.
-#
-# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
-# it is most effective under light to moderate load, or read-heavy
-# workloads; under truly massive write load, it will often be too
-# little, too late.
-flush_largest_memtables_at: 0.75
-
-# emergency pressure valve #2: the first time heap usage after a full
-# (CMS) garbage collection is above this fraction of the max,
-# Cassandra will reduce cache maximum _capacity_ to the given fraction
-# of the current _size_. Should usually be set substantially above
-# flush_largest_memtables_at, since that will have less long-term
-# impact on the system.
-#
-# Set to 1.0 to disable. Setting this lower than
-# CMSInitiatingOccupancyFraction is not likely to be useful.
-reduce_cache_sizes_at: 0.85
-reduce_cache_capacity_to: 0.6
-
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
# disk. "concurrent_reads" should be set to (16 * number_of_drives) in
@@ -280,9 +238,13 @@ reduce_cache_capacity_to: 0.6
concurrent_reads: 32
concurrent_writes: 32
+# Total memory to use for sstable-reading buffers. Defaults to
+# the smaller of 1/4 of heap or 512MB.
+# file_cache_size_in_mb: 512
+
# Total memory to use for memtables. Cassandra will flush the largest
# memtable when this much memory is used.
-# If omitted, Cassandra will set it to 1/3 of the heap.
+# If omitted, Cassandra will set it to 1/4 of the heap.
# memtable_total_space_in_mb: 2048
# Total space to use for commitlogs. Since commitlog segments are
@@ -325,7 +287,7 @@ ssl_storage_port: 7001
# Address to bind to and tell other Cassandra nodes to connect to. You
# _must_ change this if you want multiple nodes to be able to
# communicate!
-#
+#
# Leaving it blank leaves it up to InetAddress.getLocalHost(). This
# will always do the Right Thing _if_ the node is properly configured
# (hostname, name resolution, etc), and the Right Thing is to use the
@@ -348,13 +310,15 @@ listen_address: localhost
start_native_transport: true
# port for the CQL native transport to listen for clients on
native_transport_port: $NATIVE_PORT$
-# The minimum and maximum threads for handling requests when the native
-# transport is used. They are similar to rpc_min_threads and rpc_max_threads,
-# though the defaults differ slightly.
-# NOTE: native_transport_min_threads is now deprecated and ignored (but kept
-# in the 1.2.x series for compatibility sake).
-# native_transport_min_threads: 16
+# The maximum threads for handling requests when the native transport is used.
+# This is similar to rpc_max_threads though the default differs slightly (and
+# there is no native_transport_min_threads, idle threads will always be stopped
+# after 30 seconds).
# native_transport_max_threads: 128
+#
+# The maximum size of allowed frame. Frame (requests) larger than this will
+# be rejected as invalid. The default is 256MB.
+# native_transport_max_frame_size_in_mb: 256
# Whether to start the thrift rpc server.
start_rpc: true
@@ -366,16 +330,16 @@ start_rpc: true
# (i.e. it will be based on the configured hostname of the node).
#
# Note that unlike ListenAddress above, it is allowed to specify 0.0.0.0
-# here if you want to listen on all interfaces, but that will break clients
+# here if you want to listen on all interfaces, but that will break clients
# that rely on node auto-discovery.
rpc_address: localhost
# port for Thrift to listen for clients on
rpc_port: $THRIFT_PORT$
-# enable or disable keepalive on rpc connections
+# enable or disable keepalive on rpc/native connections
rpc_keepalive: true
-# Cassandra provides three out-of-the-box options for the RPC Server:
+# Cassandra provides two out-of-the-box options for the RPC Server:
#
# sync -> One thread per thrift connection. For a very large number of clients, memory
# will be your limiting factor. On a 64 bit JVM, 180KB is the minimum stack size
@@ -439,15 +403,22 @@ incremental_backups: false
snapshot_before_compaction: false
# Whether or not a snapshot is taken of the data before keyspace truncation
-# or dropping of column families. The STRONGLY advised default of true
+# or dropping of column families. The STRONGLY advised default of true
# should be used to provide data safety. If you set this flag to false, you will
# lose data on truncation or drop.
auto_snapshot: true
-# Log a debug message if more than this many tombstones are scanned
-# in a single-partition query. Set the threshold on SliceQueryFilter
-# to debug to enable.
-tombstone_debug_threshold: 10000
+# When executing a scan, within or across a partition, we need to keep the
+# tombstones seen in memory so we can return them to the coordinator, which
+# will use them to make sure other replicas also know about the deleted rows.
+# With workloads that generate a lot of tombstones, this can cause performance
+# problems and even exaust the server heap.
+# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
+# Adjust the thresholds here if you understand the dangers and want to
+# scan more tombstones anyway. These thresholds may also be adjusted at runtime
+# using the StorageService mbean.
+tombstone_warn_threshold: 1000
+tombstone_failure_threshold: 100000
# Add column indexes to a row after its contents reach this size.
# Increase if your column values are large, or if you have a very large
@@ -458,6 +429,11 @@ tombstone_debug_threshold: 10000
# that wastefully either.
column_index_size_in_kb: 64
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
# Size limit for rows being compacted in memory. Larger rows will spill
# over to disk and use a slower two-pass compaction process. A message
# will be logged specifying the row key.
@@ -478,7 +454,7 @@ in_memory_compaction_limit_in_mb: 64
# Multi-threaded compaction. When enabled, each compaction will use
# up to one thread per core, plus one thread per sstable being merged.
-# This is usually only useful for SSD-based hardware: otherwise,
+# This is usually only useful for SSD-based hardware: otherwise,
# your concern is usually to get compaction to do LESS i/o (see:
# compaction_throughput_mb_per_sec), not more.
multithreaded_compaction: false
@@ -504,11 +480,14 @@ compaction_preheat_key_cache: true
# stream_throughput_outbound_megabits_per_sec: 200
# How long the coordinator should wait for read operations to complete
-read_request_timeout_in_ms: 10000
+read_request_timeout_in_ms: 5000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete
-write_request_timeout_in_ms: 10000
+write_request_timeout_in_ms: 2000
+# How long a coordinator should continue to retry a CAS operation
+# that contends with other proposals for the same row
+cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
@@ -517,8 +496,10 @@ truncate_request_timeout_in_ms: 60000
request_timeout_in_ms: 10000
# Enable operation timeout information exchange between nodes to accurately
-# measure request timeouts, If disabled cassandra will assuming the request
-# was forwarded to the replica instantly by the coordinator
+# measure request timeouts. If disabled, replicas will assume that requests
+# were forwarded to them instantly by the coordinator, which means that
+# under overload conditions we will waste that much extra time processing
+# already-timed-out requests.
#
# Warning: before enabling this property make sure to ntp is installed
# and the times are synchronized between the nodes.
@@ -551,23 +532,18 @@ cross_node_timeout: false
#
# Out of the box, Cassandra provides
# - SimpleSnitch:
-# Treats Strategy order as proximity. This improves cache locality
-# when disabling read repair, which can further improve throughput.
-# Only appropriate for single-datacenter deployments.
+# Treats Strategy order as proximity. This can improve cache
+# locality when disabling read repair. Only appropriate for
+# single-datacenter deployments.
+# - GossipingPropertyFileSnitch
+# This should be your go-to snitch for production use. The rack
+# and datacenter for the local node are defined in
+# cassandra-rackdc.properties and propagated to other nodes via
+# gossip. If cassandra-topology.properties exists, it is used as a
+# fallback, allowing migration from the PropertyFileSnitch.
# - PropertyFileSnitch:
# Proximity is determined by rack and data center, which are
# explicitly configured in cassandra-topology.properties.
-# - GossipingPropertyFileSnitch
-# The rack and datacenter for the local node are defined in
-# cassandra-rackdc.properties and propagated to other nodes via gossip. If
-# cassandra-topology.properties exists, it is used as a fallback, allowing
-# migration from the PropertyFileSnitch.
-# - RackInferringSnitch:
-# Proximity is determined by rack and data center, which are
-# assumed to correspond to the 3rd and 2nd octet of each node's
-# IP address, respectively. Unless this happens to match your
-# deployment conventions (as it did Facebook's), this is best used
-# as an example of writing a custom Snitch class.
# - Ec2Snitch:
# Appropriate for EC2 deployments in a single Region. Loads Region
# and Availability Zone information from the EC2 API. The Region is
@@ -581,6 +557,12 @@ cross_node_timeout: false
# ssl_storage_port on the public IP firewall. (For intra-Region
# traffic, Cassandra will switch to the private IP after
# establishing a connection.)
+# - RackInferringSnitch:
+# Proximity is determined by rack and data center, which are
+# assumed to correspond to the 3rd and 2nd octet of each node's IP
+# address, respectively. Unless this happens to match your
+# deployment conventions, this is best used as an example of
+# writing a custom Snitch class and is provided in that spirit.
#
# You can use a custom Snitch by setting this to the full class name
# of the snitch, which will be assumed to be on your classpath.
@@ -588,7 +570,7 @@ endpoint_snitch: SimpleSnitch
# controls how often to perform the more expensive part of host score
# calculation
-dynamic_snitch_update_interval_in_ms: 100
+dynamic_snitch_update_interval_in_ms: 100
# controls how often to reset all host scores, allowing a bad host to
# possibly recover
dynamic_snitch_reset_interval_in_ms: 600000
@@ -618,7 +600,7 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler
# NoScheduler - Has no options
# RoundRobin
# - throttle_limit -- The throttle_limit is the number of in-flight
-# requests per client. Requests beyond
+# requests per client. Requests beyond
# that limit are queued up until
# running requests can complete.
# The value of 80 here is twice the number of
@@ -641,22 +623,11 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler
# the request scheduling. Currently the only valid option is keyspace.
# request_scheduler_id: keyspace
-# index_interval controls the sampling of entries from the primrary
-# row index in terms of space versus time. The larger the interval,
-# the smaller and less effective the sampling will be. In technicial
-# terms, the interval coresponds to the number of index entries that
-# are skipped between taking each sample. All the sampled entries
-# must fit in memory. Generally, a value between 128 and 512 here
-# coupled with a large key cache size on CFs results in the best trade
-# offs. This value is not often changed, however if you have many
-# very small rows (many to an OS page), then increasing this will
-# often lower memory usage without a impact on performance.
-index_interval: 128
-
# Enable or disable inter-node encryption
# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
# suite for authentication, key exchange and encryption of the actual data transfers.
+# Use the DHE/ECDHE ciphers if running in FIPS 140 compliant mode.
# NOTE: No custom encryption options are enabled at the moment
# The available internode options are : all, none, dc, rack
#
@@ -677,7 +648,7 @@ server_encryption_options:
# protocol: TLS
# algorithm: SunX509
# store_type: JKS
- # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
+ # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
# require_client_auth: false
# enable or disable client/server encryption.
@@ -693,17 +664,24 @@ client_encryption_options:
# protocol: TLS
# algorithm: SunX509
# store_type: JKS
- # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
+ # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
# internode_compression controls whether traffic between nodes is
# compressed.
# can be: all - all traffic is compressed
# dc - traffic between different datacenters is compressed
# none - nothing is compressed.
+# WE have to shut this off to avoid SNAPPY errors when testing
internode_compression: none
# Enable or disable tcp_nodelay for inter-dc communication.
# Disabling it will result in larger (but fewer) network packets being sent,
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
-inter_dc_tcp_nodelay: true
+inter_dc_tcp_nodelay: false
+
+# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
+# When enabled it would preheat only first "page" (4KB) of each row to optimize
+# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
+# for further details on that topic.
+preheat_kernel_page_cache: false
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
index 1ede643..c9b1e39 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
@@ -68,7 +68,7 @@ public class TestUtils {
public static <K, C> void createColumnFamiliy(final Keyspace keyspace, final ColumnFamily<K, C> columnFamily, final Map<String, Object> options){
try{
- keyspace.createColumnFamily( columnFamily, new HashMap<String, Object>() );
+ keyspace.createColumnFamily( columnFamily, options );
}catch(Exception e){
log.error( "Error on creating column family, ignoring" , e);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
index 3588268..7ec37c9 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
@@ -133,7 +133,7 @@ public class HystrixSessionTest {
final String keyspaceCreate =
"CREATE KEYSPACE " + keyspaceName + " WITH REPLICATION = { 'class' : 'SimpleStrategy', "
- + "'replication_factor' : 3 };";
+ + "'replication_factor' : 1 };";
final String createColumnFamily = "CREATE TABLE " + tableName + " ( " +
"entry_id uuid, " +
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..278c866 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -23,6 +23,7 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigration;
import org.apache.usergrid.persistence.core.migration.Migration;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManager;
@@ -135,14 +136,20 @@ public class GraphModule extends AbstractModule {
//do multibindings for migrations
Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
- migrationBinding.addBinding().to( Key.get( NodeSerialization.class ) );
- migrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class ) );
+
//bind each singleton to the multi set. Otherwise we won't migrate properly
migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) );
migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
+
+
+ Multibinder<DatastaxMigration>
+ dsMigrationBinding = Multibinder.newSetBinder( binder(), DatastaxMigration.class );
+
+ dsMigrationBinding.addBinding().to( Key.get( NodeSerialization.class ) );
+ dsMigrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index ae38372..246d549 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.graph.impl;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -28,7 +29,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.javadriver.BatchStatementUtils;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -50,11 +51,12 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Observer;
@@ -87,10 +89,11 @@ public class GraphManagerImpl implements GraphManager {
private final GraphFig graphFig;
+ private final Session session;
@Inject
- public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+ public GraphManagerImpl( final Session session, final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeSerialization storageEdgeSerialization,
final NodeSerialization nodeSerialization, final GraphFig graphFig,
@Assisted final ApplicationScope scope, final EdgeDeleteListener edgeDeleteListener,
@@ -98,12 +101,14 @@ public class GraphManagerImpl implements GraphManager {
ValidationUtils.validateApplicationScope( scope );
+ Preconditions.checkNotNull( session, "session must not be null" );
Preconditions.checkNotNull( edgeMetadataSerialization, "edgeMetadataSerialization must not be null" );
Preconditions.checkNotNull( storageEdgeSerialization, "storageEdgeSerialization must not be null" );
Preconditions.checkNotNull( nodeSerialization, "nodeSerialization must not be null" );
Preconditions.checkNotNull( graphFig, "consistencyFig must not be null" );
Preconditions.checkNotNull( scope, "scope must not be null" );
+ this.session = session;
this.scope = scope;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.storageEdgeSerialization = storageEdgeSerialization;
@@ -130,13 +135,13 @@ public class GraphManagerImpl implements GraphManager {
final UUID timestamp = UUIDGenerator.newTimeUUID();
- final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
+ final Collection<? extends Statement> mutation = edgeMetadataSerialization.writeEdge( scope, edge );
- final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+ final Collection<? extends Statement> edgeMutation =
+ storageEdgeSerialization.writeEdge( scope, edge, timestamp );
- mutation.mergeShallow( edgeMutation );
- HystrixCassandra.user( mutation );
+ BatchStatementUtils.runBatches(session, mutation, edgeMutation );
return edge;
}
@@ -158,12 +163,12 @@ public class GraphManagerImpl implements GraphManager {
final UUID timestamp = UUIDGenerator.newTimeUUID();
- final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+ final Collection<? extends Statement> edgeMutation =
+ storageEdgeSerialization.writeEdge( scope, edge, timestamp );
- LOG.debug( "Marking edge {} as deleted to commit log", edge );
- HystrixCassandra.user( edgeMutation );
+ BatchStatementUtils.runBatches(session, edgeMutation, edgeMutation );
//HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
// timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber );
@@ -188,11 +193,14 @@ public class GraphManagerImpl implements GraphManager {
final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
- final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
+ final Collection<? extends Statement> nodeMutation = nodeSerialization.mark( scope, id, timestamp );
+
+
LOG.debug( "Marking node {} as deleted to node mark", node );
- HystrixCassandra.user( nodeMutation );
+
+ BatchStatementUtils.runBatches(session, nodeMutation);
//HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 0137ba4..323db30 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.graph.impl.stage;
+import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
@@ -27,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.javadriver.BatchStatementUtils;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -36,6 +38,8 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
@@ -56,21 +60,21 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
protected final EdgeSerialization storageSerialization;
protected final GraphFig graphFig;
- protected final Keyspace keyspace;
+ protected final Session session;
@Inject
public EdgeDeleteRepairImpl( final EdgeSerialization storageSerialization,
- final GraphFig graphFig, final Keyspace keyspace ) {
+ final GraphFig graphFig, final Session session ) {
Preconditions.checkNotNull( "storageSerialization is required", storageSerialization );
Preconditions.checkNotNull( "consistencyFig is required", graphFig );
- Preconditions.checkNotNull( "keyspace is required", keyspace );
+ Preconditions.checkNotNull( "session is required", session );
this.storageSerialization = storageSerialization;
this.graphFig = graphFig;
- this.keyspace = keyspace;
+ this.session = session;
}
@@ -93,8 +97,11 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
//remove from the commit log
+ final Collection<? extends Statement>
+ deletes = storageSerialization.deleteEdge( scope, edge, timestamp );
+
//remove from storage
- HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
+ BatchStatementUtils.runBatches(session, deletes );
}
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 7e09eca..5c6f4a1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.impl.stage;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -28,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.javadriver.BatchStatementUtils;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -41,6 +43,9 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -65,23 +70,22 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
private final EdgeMetadataSerialization edgeMetadataSerialization;
private final EdgeSerialization storageEdgeSerialization;
- private final Keyspace keyspace;
+ private final Session session;
private final GraphFig graphFig;
@Inject
- public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final Keyspace keyspace,
+ public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final Session session,
final GraphFig graphFig, final EdgeSerialization storageEdgeSerialization ) {
- Preconditions.checkNotNull( "edgeMetadataSerialization is required", edgeMetadataSerialization );
- Preconditions.checkNotNull( "storageEdgeSerialization is required", storageEdgeSerialization );
- Preconditions.checkNotNull( "consistencyFig is required", graphFig );
- Preconditions.checkNotNull( "cassandraConfig is required", graphFig );
- Preconditions.checkNotNull( "keyspace is required", keyspace );
+ Preconditions.checkNotNull( edgeMetadataSerialization, "edgeMetadataSerialization is required" );
+ Preconditions.checkNotNull( storageEdgeSerialization, "storageEdgeSerialization is required" );
+ Preconditions.checkNotNull( graphFig, "graphFig is required" );
+ Preconditions.checkNotNull( session, "session is required" );
this.edgeMetadataSerialization = edgeMetadataSerialization;
- this.keyspace = keyspace;
+ this.session = session;
this.graphFig = graphFig;
this.storageEdgeSerialization = storageEdgeSerialization;
}
@@ -122,8 +126,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
public Observable<Integer> call( final List<String> types ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
+ final BatchStatement batch = BatchStatementUtils.fastStatement();
final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
//for each id type, check if the exist in parallel to increase processing speed
@@ -161,9 +164,11 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
LOG.debug( "No edges with nodeId {}, type {}, "
+ "and subtype {}. Removing subtype.", node,
edgeType, subType );
- batch.mergeShallow( serialization
- .removeEdgeSubType( scope, node, edgeType, subType,
- maxTimestamp ) );
+
+ final Collection<? extends Statement> statements = serialization.removeEdgeSubType(
+ scope, node, edgeType, subType, maxTimestamp );
+
+ batch.addAll( statements );
}
} );
@@ -185,9 +190,9 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
"Executing batch for subtype deletion with " +
"type {}. "
+ "Mutation has {} rows to mutate ",
- edgeType, batch.getRowCount() );
+ edgeType, batch.getStatements().size() );
- HystrixCassandra.async( batch );
+ session.execute( batch );
}
}
@@ -218,7 +223,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType,
maxTimestamp );
- HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
+ BatchStatementUtils.runBatches(session, serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
}
} );
}
@@ -245,14 +250,15 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
/**
* Remove the sub type specified
*/
- MutationBatch removeEdgeSubType( final ApplicationScope scope, final Id nodeId, final String edgeType,
- final String subType, final long maxTimestamp );
+ java.util.Collection<? extends com.datastax.driver.core.Statement> removeEdgeSubType(
+ final ApplicationScope scope, final Id nodeId, final String type, final String subType,
+ final long maxTimestamp );
/**
* Remove the edge type
*/
- MutationBatch removeEdgeType( final ApplicationScope scope, final Id nodeId, final String type,
- final long maxTimestamp );
+ Collection<? extends Statement> removeEdgeType( final ApplicationScope scope, final Id nodeId,
+ final String type, final long maxTimestamp );
}
@@ -292,15 +298,16 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Override
- public MutationBatch removeEdgeSubType( final ApplicationScope scope, final Id nodeId, final String type,
- final String subType, final long maxTimestamp ) {
+ public Collection<? extends Statement> removeEdgeSubType(
+ final ApplicationScope scope, final Id nodeId, final String type, final String subType,
+ final long maxTimestamp ) {
return edgeMetadataSerialization.removeIdTypeToTarget( scope, nodeId, type, subType, maxTimestamp );
}
@Override
- public MutationBatch removeEdgeType( final ApplicationScope scope, final Id nodeId, final String type,
- final long maxTimestamp ) {
+ public Collection<? extends Statement> removeEdgeType( final ApplicationScope scope, final Id nodeId,
+ final String type, final long maxTimestamp ) {
return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, nodeId, type, maxTimestamp );
}
};
@@ -338,15 +345,16 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Override
- public MutationBatch removeEdgeSubType( final ApplicationScope scope, final Id nodeId, final String type,
- final String subType, final long maxTimestamp ) {
+ public java.util.Collection<? extends com.datastax.driver.core.Statement> removeEdgeSubType(
+ final ApplicationScope scope, final Id nodeId, final String type, final String subType,
+ final long maxTimestamp ) {
return edgeMetadataSerialization.removeIdTypeFromSource( scope, nodeId, type, subType, maxTimestamp );
}
@Override
- public MutationBatch removeEdgeType( final ApplicationScope scope, final Id nodeId, final String type,
- final long maxTimestamp ) {
+ public Collection<? extends Statement> removeEdgeType( final ApplicationScope scope, final Id nodeId,
+ final String type, final long maxTimestamp ) {
return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, nodeId, type, maxTimestamp );
}
};