You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2018/12/29 19:08:11 UTC

[1/3] usergrid git commit: Tool to perform read repair on cassandra and also to repair entities with missing unique value entry. Have options to dry run only to detect entities with missing unique value entry.

Repository: usergrid
Updated Branches:
  refs/heads/master 628ad0129 -> 6af6f17a3


Tool to perform read repair on cassandra and also to  repair entities with missing unique value entry.
Have options to dry run only to detect entities with missing unique value entry.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d12d82c2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d12d82c2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d12d82c2

Branch: refs/heads/master
Commit: d12d82c2b3f31d9d9b4625422f016486a257e245
Parents: abec1d9
Author: Chetan Burse <cb...@google.com>
Authored: Mon Nov 26 13:17:55 2018 -0800
Committer: Chetan Burse <cb...@google.com>
Committed: Thu Dec 13 17:20:53 2018 -0800

----------------------------------------------------------------------
 .../usergrid/tools/UniqueValueRepairer.java     | 448 +++++++++++++++++++
 1 file changed, 448 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12d82c2/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
new file mode 100644
index 0000000..04915be
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+public class UniqueValueRepairer extends ExportingToolBase {
+
+	static final Logger logger = LoggerFactory.getLogger(UniqueValueRepairer.class);
+
+	JsonFactory jsonFactory = new JsonFactory();
+	public static final String LAST_ID = "lastId";
+
+	public static final String FIND_MISSING_UNIQUE_VALUES = "findMissingUniqueValues";
+	public static final String FIX_MISSING_VALUES = "fixUniqueValues";
+
+	private boolean findMissingUniqueValues = false;
+	private boolean fixMissingValue = false;
+
+	private AllEntityIdsObservable allEntityIdsObs;
+	private SimpleEdge lastEdge = null;
+
+	private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+	private ExecutorService uniqueValueChecker = Executors.newFixedThreadPool(50);
+
+	private Session session;
+	private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+	private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+	@Override
+	@SuppressWarnings("static-access")
+	public Options createOptions() {
+
+		Options options = super.createOptions();
+
+		Option findMissingUniqueValues = OptionBuilder
+				.withDescription("Find entities with missing unique value entry  -findMissingUniqueValues")
+				.create(FIND_MISSING_UNIQUE_VALUES);
+		Option fixMissingUniqueValueEntries = OptionBuilder
+				.withDescription("Fix entities with missing unique value entry  -fixUniqueValues")
+				.create(FIX_MISSING_VALUES);
+
+		options.addOption(findMissingUniqueValues);
+		options.addOption(fixMissingUniqueValueEntries);
+
+		return options;
+	}
+
+	@Override
+	public void runTool(CommandLine line) throws Exception {
+
+		startSpring();
+		setVerbose(line);
+
+		this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+		applyInputParams(line);
+
+		mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+		uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+		session = injector.getInstance(Session.class);
+
+		startEntityScan();
+
+		logger.info("Finished checking entities. Waiting for threads to complete execution.");
+
+		while (true) {
+			try {
+				// Spinning to prevent program execution from ending.
+				// Need to replace with some kind of countdown latch or task tracker
+				Thread.sleep(10000);
+			} catch (InterruptedException e) {
+				logger.error("Exception while waiting for unique check to complete.", e);
+			}
+		}
+	}
+
+	private void startEntityScan() throws Exception, UnsupportedEncodingException {
+
+		for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
+
+			// Let's skip the test entities.
+			if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+				continue;
+			}
+			fetchApplicationsForOrgs(organizationName.getKey(), organizationName.getValue());
+		}
+	}
+
+	private Map<UUID, String> getOrgs() throws Exception {
+		// Loop through the organizations
+		Map<UUID, String> organizationNames = null;
+
+		if (orgId == null && (orgName == null || orgName.trim().equals(""))) {
+			organizationNames = managementService.getOrganizations();
+		} else {
+			OrganizationInfo info = null;
+
+			if (orgId != null) {
+				info = managementService.getOrganizationByUuid(orgId);
+			} else {
+				info = managementService.getOrganizationByName(orgName);
+			}
+
+			if (info == null) {
+				logger.error("Organization info is null!");
+				System.exit(1);
+			}
+
+			organizationNames = new HashMap<UUID, String>();
+			organizationNames.put(info.getUuid(), info.getName());
+		}
+
+		return organizationNames;
+	}
+
+	private void fetchApplicationsForOrgs(UUID orgId, String orgName) throws Exception {
+
+		logger.info("Fetch applications for {} : {} ", orgId, orgName);
+
+		// Loop through the applications per organization
+		BiMap<UUID, String> applications = managementService.getApplicationsForOrganization(orgId);
+
+		if (applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+			// export all apps as appId or name is not provided
+
+			Observable.from(applications.entrySet()).subscribe(appEntry -> {
+				UUID appId = appEntry.getKey();
+				String appName = appEntry.getValue().split("/")[1];
+				try {
+					fetchApplications(appId, appName);
+				} catch (Exception e) {
+					logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+				}
+			});
+
+		} else {
+
+			UUID appId = applicationId;
+			String appName = applicationName;
+
+			if (applicationId != null) {
+				appName = applications.get(appId);
+			} else {
+				appId = applications.inverse().get(orgName + '/' + appName);
+			}
+
+			try {
+				fetchApplications(appId, appName);
+			} catch (Exception e) {
+				logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+			}
+
+		}
+	}
+
+	private void fetchApplications(UUID appId, String appName) throws Exception {
+
+		logger.info("Fetching application for {} : {} ", appName, appId);
+
+		EntityManager em = emf.getEntityManager(appId);
+
+		Set<String> collections = em.getApplicationCollections();
+
+		if (collNames == null || collNames.length <= 0) {
+			logger.info("Please pass collection name ( -collectionName testCollection ) ");
+		} else {
+			Observable.from(collNames).subscribe(collectionName -> {
+				if (collections.contains(collectionName)) {
+					fetchCollections(appId, collectionName, em);
+				}
+			});
+		}
+
+	}
+
+	private void fetchCollections(UUID appId, String collectionName, EntityManager em) {
+		extractEntitiesForCollection(appId, collectionName);
+	}
+
+	private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
+
+		AtomicInteger batch = new AtomicInteger(1);
+
+		final EntityManager rootEm = emf.getEntityManager(applicationId);
+
+		ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
+		allEntityIdsObs
+				.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
+						Optional.fromNullable(
+								CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
+						(lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+				.buffer(1000).finallyDo(() -> {
+					edgeScopeFetcher.shutdown();
+					logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
+							collectionName);
+					while (!edgeScopeFetcher.isTerminated()) {
+						try {
+							edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
+						} catch (InterruptedException e) {
+						}
+					}
+					logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
+				}).subscribe(edges -> {
+
+					logger.info("For collection {}", collectionName);
+					Integer batchId = batch.getAndIncrement();
+					logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+					Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
+
+						List<UUID> entityIds = new ArrayList<UUID>(1000);
+
+						for (EdgeScope edgeScope : edgeScopes) {
+							Id entityId = edgeScope.getEdge().getTargetNode();
+							if (entityId != null) {
+								entityIds.add(entityId.getUuid());
+							} else {
+								edgeScopes.remove(edgeScope);
+							}
+						}
+						try {
+							String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+							Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
+									.subscribe(entIds -> {
+
+										logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
+												type, batchId);
+										Results entities = rootEm.getEntities(entIds, type);
+										logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
+												type, batchId);
+										try {
+
+											ConnectableObservable<Entity> entityObs = Observable
+													.from(entities.getEntities()).publish();
+											entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
+											entityObs.subscribe(t -> {
+												logger.info("Fetched entity with UUID : {}", t.getUuid());
+												if (findMissingUniqueValues) {
+													String fieldValue = null;
+													//We can search entity with UUID or name/email based on the entity type. 
+													//This mapping between unique value field(name/email etc) and UUID,
+													//is stored in unique value table. This can either be name / email or any other type.
+													//This value is being passed as field type. 
+										            //The code below takes the parameter and retrieves the value of the field using the getter method. 
+													if (fieldType == null || fieldType.equals("")
+															|| fieldType.equals("name")) {
+														fieldType = "name";
+														fieldValue = t.getName();
+													} else {
+														try {
+															Method method = t.getClass()
+																	.getMethod("get"
+																			+ fieldType.substring(0, 1).toUpperCase()
+																			+ fieldType.substring(1));
+															fieldValue = (String) method.invoke(t);
+														} catch (Exception e1) {
+															logger.error(
+																	"Exception while trying to fetch field value of type {} for entity {} batch {}",
+																	fieldType, t.getUuid(), batchId, e1);
+														}
+													}
+													try {
+														if (fieldValue != null) {
+
+															Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
+																	fieldValue, false);
+
+															if (e == null) {
+																logger.info(
+																		"No entity found for field type {} and field value {} but exists for UUID {}",
+																		fieldType, fieldValue, t.getUuid());
+																if (fixMissingValue) {
+																	logger.info(
+																			"Trying to repair unique value mapping for {} ",
+																			t.getUuid());
+																	UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
+																			.load(new ApplicationScopeImpl(new SimpleId(
+																					applicationId, "application")),
+																					ConsistencyLevel
+																							.valueOf(System.getProperty(
+																									"usergrid.read.cl",
+																									"LOCAL_QUORUM")),
+																					t.getType(),
+																					Collections.singletonList(
+																							new StringField(fieldType,
+																									fieldValue)),
+																					false);
+
+																	ApplicationScope applicationScope = new ApplicationScopeImpl(
+																			new SimpleId(applicationId, "application"));
+																	com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
+																			.load(applicationScope, new SimpleId(
+																					t.getUuid(), t.getType()));
+
+																	if (!entity.isPresent()
+																			|| !entity.get().getEntity().isPresent()) {
+																		throw new RuntimeException(
+																				"Unable to update unique value index because supplied UUID "
+																						+ t.getUuid()
+																						+ " does not exist");
+																	}
+																	logger.info("Delete unique value: {}",
+																			uniqueValueSet.getValue(fieldType));
+																	try {
+																		session.execute(uniqueValueSerializationStrategy
+																				.deleteCQL(applicationScope,
+																						uniqueValueSet
+																								.getValue(fieldType)));
+																	} catch (Exception ex) {
+																		logger.error(
+																				"Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
+																				t.getUuid(), ex);
+																	}
+
+																	UniqueValue newUniqueValue = new UniqueValueImpl(
+																			new StringField(fieldType, fieldValue),
+																			entity.get().getId(),
+																			entity.get().getVersion());
+																	logger.info("Writing new unique value: {}",
+																			newUniqueValue);
+																	session.execute(uniqueValueSerializationStrategy
+																			.writeCQL(applicationScope, newUniqueValue,
+																					-1));
+																}
+
+															} else {
+																logger.info(
+																		"Found entity {} for field type {} and field value {}",
+																		e.getUuid(), fieldType, fieldValue);
+															}
+														} else {
+															logger.info("No value found for field {} for entity {}",
+																	fieldType, t.getUuid());
+														}
+													} catch (Exception e) {
+														logger.error(
+																"Error while checking unique values for batch id : {} for entity {}",
+																batchId, t.getUuid(), e);
+													}
+												}
+											});
+											entityObs.connect();
+
+										} catch (Exception e) {
+											logger.error(
+													"Error while checking unique values for batch id : {} for collection {}",
+													batchId, collectionName, e);
+										}
+									});
+
+						} catch (Exception e) {
+							logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
+							System.exit(0);
+						}
+					});
+					logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
+				});
+		logger.info("Exiting extractEntitiesForCollection() method.");
+	}
+
+	protected void applyInputParams(CommandLine line) {
+
+		if (line.hasOption(ORG_ID)) {
+			orgId = ConversionUtils.uuid(line.getOptionValue(ORG_ID));
+		} else if (line.hasOption(ORG_NAME)) {
+			orgName = line.getOptionValue(ORG_NAME);
+		}
+
+		if (line.hasOption(APP_ID)) {
+			applicationId = ConversionUtils.uuid(line.getOptionValue(APP_ID));
+		} else if (line.hasOption(APP_NAME)) {
+			applicationName = line.getOptionValue(APP_NAME);
+		}
+		if (line.hasOption(COLL_NAMES)) {
+			collNames = line.getOptionValue(COLL_NAMES).split(",");
+		}
+		if (line.hasOption(COLLECTION_NAME)) {
+			collNames = new String[] { line.getOptionValue(COLLECTION_NAME) };
+		}
+		findMissingUniqueValues = line.hasOption(FIND_MISSING_UNIQUE_VALUES);
+		fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
+
+	}
+}


[2/3] usergrid git commit: Merge pull request #1 from bursech/uniqueValueRepaireWork

Posted by mr...@apache.org.
Merge pull request #1 from bursech/uniqueValueRepaireWork

Tool to perform read repair on cassandra and also to  repair entities wit…

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/08f8dd50
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/08f8dd50
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/08f8dd50

Branch: refs/heads/master
Commit: 08f8dd500cb65991a8d102a39f68706f8e49a015
Parents: abec1d9 d12d82c
Author: keyurkarnik <42...@users.noreply.github.com>
Authored: Thu Dec 13 17:28:32 2018 -0800
Committer: GitHub <no...@github.com>
Committed: Thu Dec 13 17:28:32 2018 -0800

----------------------------------------------------------------------
 .../usergrid/tools/UniqueValueRepairer.java     | 448 +++++++++++++++++++
 1 file changed, 448 insertions(+)
----------------------------------------------------------------------



[3/3] usergrid git commit: Merge commit 'refs/pull/617/head' of github.com:apache/usergrid

Posted by mr...@apache.org.
Merge commit 'refs/pull/617/head' of github.com:apache/usergrid


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6af6f17a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6af6f17a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6af6f17a

Branch: refs/heads/master
Commit: 6af6f17a333e9f0edd3ca9212164befea596f8e5
Parents: 628ad01 08f8dd5
Author: Michael Russo <ru...@google.com>
Authored: Sat Dec 29 11:08:03 2018 -0800
Committer: Michael Russo <ru...@google.com>
Committed: Sat Dec 29 11:08:03 2018 -0800

----------------------------------------------------------------------
 .../usergrid/tools/UniqueValueRepairer.java     | 448 +++++++++++++++++++
 1 file changed, 448 insertions(+)
----------------------------------------------------------------------