You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/03/31 07:44:47 UTC
[1/4] usergrid git commit: Initial commit of export application API.
Repository: usergrid
Updated Branches:
refs/heads/master ea1ba360d -> 8b63aae7d
Initial commit of export application API.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/82e7ec57
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/82e7ec57
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/82e7ec57
Branch: refs/heads/master
Commit: 82e7ec57bef24c8b75d3f3479952522fdd916bfa
Parents: ea1ba36
Author: Michael Russo <ru...@google.com>
Authored: Mon Mar 27 00:01:38 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Mon Mar 27 00:01:38 2017 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 4 +
.../export/ExportRequestBuilder.java | 47 ++++
.../export/ExportRequestBuilderImpl.java | 65 +++++
.../corepersistence/export/ExportService.java | 49 ++++
.../export/ExportServiceImpl.java | 235 +++++++++++++++++++
.../util/ObjectJsonSerializer.java | 17 ++
.../rest/applications/ApplicationResource.java | 48 ++++
7 files changed, 465 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ef4bb04..af297f2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -20,6 +20,8 @@ import com.google.inject.*;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
import org.apache.usergrid.corepersistence.asyncevents.*;
+import org.apache.usergrid.corepersistence.export.ExportService;
+import org.apache.usergrid.corepersistence.export.ExportServiceImpl;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.migration.CoreMigration;
import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -137,6 +139,8 @@ public class CoreModule extends AbstractModule {
bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
+ bind( ExportService.class ).to( ExportServiceImpl.class );
+
install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
.build( AggregationServiceFactory.class ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java
new file mode 100644
index 0000000..71c64f6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface ExportRequestBuilder {
+
+ /**
+ * Set the application id
+ */
+ ExportRequestBuilder withApplicationId(final UUID applicationId);
+
+
+ /**
+ * Get the application scope
+ * @return
+ */
+ Optional<ApplicationScope> getApplicationScope();
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java
new file mode 100644
index 0000000..73fcec4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Index service request builder
+ */
+public class ExportRequestBuilderImpl implements ExportRequestBuilder {
+
+ private Optional<UUID> withApplicationId = Optional.absent();
+ private Optional<String> withCollectionName = Optional.absent();
+ private Optional<String> cursor = Optional.absent();
+ private Optional<Long> updateTimestamp = Optional.absent();
+ private Optional<Integer> delayTimer = Optional.absent();
+ private Optional<TimeUnit> timeUnitOptional = Optional.absent();
+
+
+ /***
+ *
+ * @param applicationId The application id
+ * @return
+ */
+ @Override
+ public ExportRequestBuilder withApplicationId(final UUID applicationId ) {
+ this.withApplicationId = Optional.fromNullable( applicationId );
+ return this;
+ }
+
+ @Override
+ public Optional<ApplicationScope> getApplicationScope() {
+
+ if ( this.withApplicationId.isPresent() ) {
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+ }
+
+ return Optional.absent();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
new file mode 100644
index 0000000..7615448
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An interface for exporting all entities within an application
+ */
+public interface ExportService {
+
+
+ /**
+ * Perform an application export
+ *
+ * @param exportRequestBuilder The builder to build the request
+ */
+ void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException;
+
+
+ /**
+ * Generate a build for the index
+ */
+ ExportRequestBuilder getBuilder();
+
+
+ enum Status{
+ STARTED, INPROGRESS, COMPLETE, UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
new file mode 100644
index 0000000..ebbcc58
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+import java.io.*;
+import java.util.Map;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+
+@Singleton
+public class ExportServiceImpl implements ExportService {
+
+ private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
+
+ private static final MapScope RESUME_MAP_SCOPE =
+ new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "export-status" );
+
+
+ private static final String MAP_COUNT_KEY = "count";
+ private static final String MAP_STATUS_KEY = "status";
+ private static final String MAP_UPDATED_KEY = "lastUpdated";
+
+
+ private final AllEntityIdsObservable allEntityIdsObservable;
+ private final MapManager mapManager;
+ private final MapManagerFactory mapManagerFactory;
+ private final CollectionSettingsFactory collectionSettingsFactory;
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final ManagerCache managerCache;
+
+ ObjectJsonSerializer jsonSerializer = ObjectJsonSerializer.INSTANCE;
+
+
+
+ @Inject
+ public ExportServiceImpl(final AllEntityIdsObservable allEntityIdsObservable,
+ final ManagerCache managerCache,
+ final MapManagerFactory mapManagerFactory,
+ final CollectionSettingsFactory collectionSettingsFactory,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory) {
+ this.allEntityIdsObservable = allEntityIdsObservable;
+ this.collectionSettingsFactory = collectionSettingsFactory;
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.mapManagerFactory = mapManagerFactory;
+ this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
+ this.managerCache = managerCache;
+ }
+
+
+ @Override
+ public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException {
+
+ final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
+
+ //final AtomicInteger count = new AtomicInteger();
+ final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
+ final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
+
+
+ //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
+
+ final String rootPath = appScope.getApplication().getUuid().toString();
+
+ GraphManager gm = managerCache.getGraphManager( appScope );
+
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
+ .doOnNext( edgeScope -> {
+
+ try {
+
+ // load the entity and convert to a normal map
+ Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
+ Map entityMap = CpEntityMapUtils.toMap(entity);
+
+ if (entity != null) {
+ final String filenameWithPath = rootPath + "/" +
+ edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" +
+ edgeScope.getEdge().getType() + "_" +
+ edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json";
+
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+
+ logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+ zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
+
+ } else {
+ logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString());
+ }
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+ }
+
+ //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
+ })
+ .flatMap( edgeScope -> {
+
+ // find all connection types for the each entity emitted from the app
+ return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+ .flatMap(emittedEdgeType -> {
+
+ logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
+ return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(),
+ emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ));
+
+ }).doOnNext( markedEdge -> {
+
+ if (!markedEdge.isDeleted()){
+
+ // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking
+ Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+ Map entityMap = CpEntityMapUtils.toMap(entity);
+
+ try {
+ final String filenameWithPath = rootPath + "/" +
+ markedEdge.getSourceNode().getUuid().toString() + "_" +
+ markedEdge.getType() + "_" +
+ markedEdge.getTargetNode().getUuid().toString() + ".json";
+
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+ logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+ zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+ }
+ }
+
+ });
+
+ })
+ .doOnCompleted(() -> {
+
+ //writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() );
+ try {
+ logger.debug("closing zip stream");
+ zipOutputStream.close();
+
+ } catch (IOException e) {
+ logger.error( "unable to close zip stream");
+ }
+
+ })
+ .subscribeOn( Schedulers.io() ).toBlocking().lastOrDefault(null);
+ }
+
+
+ @Override
+ public ExportRequestBuilder getBuilder() {
+ return new ExportRequestBuilderImpl();
+ }
+
+
+
+
+ /**
+ * Write our state meta data into cassandra so everyone can see it
+ * @param jobId
+ * @param status
+ * @param processedCount
+ * @param lastUpdated
+ */
+ private void writeStateMeta( final String jobId, final Status status, final long processedCount,
+ final long lastUpdated ) {
+
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+ jobId, status, processedCount, lastUpdated);
+ }
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
+ mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
index 4e5873a..b704afa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -77,6 +77,23 @@ public final class ObjectJsonSerializer {
return stringValue;
}
+ public <T> String toString( final T toSerialize ) {
+
+ Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
+ final String stringValue;
+ //mark this version as empty
+
+ //Convert to internal entity map
+ try {
+ stringValue = MAPPER.writeValueAsString( toSerialize );
+ }
+ catch ( JsonProcessingException jpe ) {
+ throw new RuntimeException( "Unable to serialize entity", jpe );
+ }
+
+ return stringValue;
+ }
+
public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 9836f1c..920287b 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -25,6 +25,11 @@ import org.apache.amber.oauth2.common.message.OAuthResponse;
import org.apache.amber.oauth2.common.message.types.GrantType;
import org.apache.shiro.authz.UnauthorizedException;
import org.apache.shiro.codec.Base64;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilder;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.export.ExportService;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.exceptions.DisabledAdminUserException;
import org.apache.usergrid.management.exceptions.DisabledAppUserException;
@@ -37,6 +42,7 @@ import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.applications.assets.AssetsResource;
@@ -58,6 +64,8 @@ import org.springframework.stereotype.Component;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Map;
@@ -676,4 +684,44 @@ public class ApplicationResource extends CollectionResource {
}
+ private ExportService getExportService() {
+ return injector.getInstance( ExportService.class );
+ }
+
+
+ @GET
+ @Path("export")
+ @RequireApplicationAccess
+ @Produces({"application/zip"})
+ public Response getExport( @Context UriInfo ui,
+ @QueryParam("callback") @DefaultValue("callback") String callback )
+ throws Exception {
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("ApplicationResource.getExport");
+ }
+
+ if ( !isApplicationAdmin( Identifier.fromUUID( applicationId ) ) ) {
+ throw new UnauthorizedException();
+ }
+
+
+ final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId );
+ StreamingOutput stream = new StreamingOutput() {
+ @Override
+ public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+ getExportService().export(request,outputStream);
+ }
+ };
+ return Response
+ .ok(stream)
+ .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"")
+ .build();
+ }
+
+
+
+
+
+
}
[2/4] usergrid git commit: Batch write to the export stream and use a
cleaner filename.
Posted by mr...@apache.org.
Batch write to the export stream and use a cleaner filename.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d56d813
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d56d813
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d56d813
Branch: refs/heads/master
Commit: 2d56d813a9d99295ef72ab055e8600f62df364e9
Parents: 82e7ec5
Author: Michael Russo <ru...@google.com>
Authored: Mon Mar 27 18:48:49 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Mon Mar 27 18:48:49 2017 -0700
----------------------------------------------------------------------
.../export/ExportServiceImpl.java | 153 ++++++++++++-------
.../rest/applications/ApplicationResource.java | 4 +-
2 files changed, 104 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
index ebbcc58..47d6982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -48,7 +48,9 @@ import rx.schedulers.Schedulers;
import java.io.*;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -98,91 +100,138 @@ public class ExportServiceImpl implements ExportService {
final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
- //final AtomicInteger count = new AtomicInteger();
final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
-
- //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
- final String rootPath = appScope.getApplication().getUuid().toString();
-
GraphManager gm = managerCache.getGraphManager( appScope );
+ final AtomicInteger entityFileCount = new AtomicInteger();
+ final AtomicInteger connectionFileCount = new AtomicInteger();
+
allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
- .doOnNext( edgeScope -> {
+ .buffer( 1000 )
+ .doOnNext( edgeScopes -> {
+
try {
- // load the entity and convert to a normal map
- Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
- Map entityMap = CpEntityMapUtils.toMap(entity);
+ final String filenameWithPath = "entities/" +
+ "entities."+ entityFileCount.get() + ".json";
+
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+ edgeScopes.forEach( edgeScope -> {
+
+
+ try {
+ // load the entity and convert to a normal map
+ Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
+ Map entityMap = CpEntityMapUtils.toMap(entity);
+
+ if (entity != null) {
+
- if (entity != null) {
- final String filenameWithPath = rootPath + "/" +
- edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" +
- edgeScope.getEdge().getType() + "_" +
- edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json";
- logger.debug("adding zip entry: {}", filenameWithPath);
- zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+ logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+ zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+ zipOutputStream.write("\n".getBytes());
+
+ } else {
+ logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString());
+ }
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+ }
- logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
- zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
- zipOutputStream.closeEntry();
- zipOutputStream.flush();
- } else {
- logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString());
- }
+ entityFileCount.addAndGet(1);
+
+ });
+
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
} catch (IOException e) {
- logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+ logger.warn("Unable to create entry in zip export for batch entities");
}
//writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
})
- .flatMap( edgeScope -> {
+ .doOnNext( edgeScopes -> {
- // find all connection types for the each entity emitted from the app
- return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
- .flatMap(emittedEdgeType -> {
+ try{
- logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
- return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(),
- emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ));
+ final String filenameWithPath = "connections/" +
+ "connections." + connectionFileCount.get() + ".json";
- }).doOnNext( markedEdge -> {
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
- if (!markedEdge.isDeleted()){
+ edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+ .flatMap(emittedEdgeType -> {
- // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking
- Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+ logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
+ return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(),
+ emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
- Map entityMap = CpEntityMapUtils.toMap(entity);
+ })
- try {
- final String filenameWithPath = rootPath + "/" +
- markedEdge.getSourceNode().getUuid().toString() + "_" +
- markedEdge.getType() + "_" +
- markedEdge.getTargetNode().getUuid().toString() + ".json";
+ .buffer( 1000 )
+ .doOnNext(markedEdges -> {
- logger.debug("adding zip entry: {}", filenameWithPath);
- zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
- logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
- zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
- zipOutputStream.closeEntry();
- zipOutputStream.flush();
- } catch (IOException e) {
- logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
- }
- }
+ markedEdges.forEach( markedEdge -> {
- });
+ if (!markedEdge.isDeleted()) {
+
+ // doing the load to just again make sure bad connections are not exported
+ Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+ if (entity != null) {
+
+ try {
+
+ final Map<String,String> connectionMap = new HashMap<String,String>(1){{
+ put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
+ put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
+ put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
+ }};
+
+ logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
+ zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
+ zipOutputStream.write("\n".getBytes());
+
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+ }
+ } else {
+ logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
+ }
+ }
+
+ });
+
+
+
+
+ }).toBlocking().lastOrDefault(null));
+
+ connectionFileCount.addAndGet(1);
+
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
+
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for batch connections");
+ }
})
.doOnCompleted(() -> {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 920287b..7479a90 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.export.ExportService;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.exceptions.DisabledAdminUserException;
import org.apache.usergrid.management.exceptions.DisabledAppUserException;
import org.apache.usergrid.management.exceptions.UnactivatedAdminUserException;
@@ -705,6 +706,7 @@ public class ApplicationResource extends CollectionResource {
throw new UnauthorizedException();
}
+ ApplicationInfo appInfo = management.getApplicationInfo(applicationId);
final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId );
StreamingOutput stream = new StreamingOutput() {
@@ -715,7 +717,7 @@ public class ApplicationResource extends CollectionResource {
};
return Response
.ok(stream)
- .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"")
+ .header("Content-Disposition", "attachment; filename=\""+appInfo.getName().replace("/","_")+"_"+System.currentTimeMillis()+".zip\"")
.build();
}
[3/4] usergrid git commit: Update new ExportService to better stream
out the results directly from the database. Added tests to validate
Posted by mr...@apache.org.
Update new ExportService to better stream out the results directly from the database. Added tests to validate
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b37fc81e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b37fc81e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b37fc81e
Branch: refs/heads/master
Commit: b37fc81e984c43310bb8590a606ce639312e24b2
Parents: 2d56d81
Author: Michael Russo <ru...@google.com>
Authored: Thu Mar 30 19:53:12 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Thu Mar 30 19:53:12 2017 -0700
----------------------------------------------------------------------
.../corepersistence/export/ExportService.java | 6 +-
.../export/ExportServiceImpl.java | 245 +++++++--------
.../corepersistence/export/ExportServiceIT.java | 122 +++++++
.../MvccEntitySerializationStrategyV3Impl.java | 8 +-
.../exceptions/AbstractExceptionMapper.java | 3 +-
.../organizations/OrganizationResource.java | 190 +++++------
.../applications/ApplicationResource.java | 314 +++++++++----------
.../notifications/NotificationsService.java | 13 +-
.../apns/NotificationsServiceIT.java | 49 ++-
9 files changed, 558 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
index 7615448..0c7789b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
@@ -30,15 +30,15 @@ public interface ExportService {
/**
- * Perform an application export
+ * Perform an application export into the provided OutputStream
*
* @param exportRequestBuilder The builder to build the request
*/
- void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException;
+ void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws RuntimeException;
/**
- * Generate a build for the index
+ * Generate a builder for the export request
*/
ExportRequestBuilder getBuilder();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
index 47d6982..e561ac6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.serialization.impl.EntitySetImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -41,6 +43,8 @@ import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.InflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -48,7 +52,9 @@ import rx.schedulers.Schedulers;
import java.io.*;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
@@ -58,47 +64,32 @@ import java.util.zip.ZipOutputStream;
@Singleton
public class ExportServiceImpl implements ExportService {
- private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
-
- private static final MapScope RESUME_MAP_SCOPE =
- new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "export-status" );
-
-
- private static final String MAP_COUNT_KEY = "count";
- private static final String MAP_STATUS_KEY = "status";
- private static final String MAP_UPDATED_KEY = "lastUpdated";
-
+ private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
private final AllEntityIdsObservable allEntityIdsObservable;
- private final MapManager mapManager;
- private final MapManagerFactory mapManagerFactory;
- private final CollectionSettingsFactory collectionSettingsFactory;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final ManagerCache managerCache;
-
- ObjectJsonSerializer jsonSerializer = ObjectJsonSerializer.INSTANCE;
+ private final ObjectJsonSerializer jsonSerializer = ObjectJsonSerializer.INSTANCE;
+ private final int exportVersion = 1;
+ private final String keyTotalEntityCount = "__totalEntityCount__";
@Inject
public ExportServiceImpl(final AllEntityIdsObservable allEntityIdsObservable,
final ManagerCache managerCache,
- final MapManagerFactory mapManagerFactory,
- final CollectionSettingsFactory collectionSettingsFactory,
final EntityCollectionManagerFactory entityCollectionManagerFactory) {
this.allEntityIdsObservable = allEntityIdsObservable;
- this.collectionSettingsFactory = collectionSettingsFactory;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.mapManagerFactory = mapManagerFactory;
- this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
this.managerCache = managerCache;
}
@Override
- public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException {
+ public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws RuntimeException {
final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
+ zipOutputStream.setLevel(9); // best compression to reduce the amount of data to stream over the wire
final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
@@ -107,142 +98,172 @@ public class ExportServiceImpl implements ExportService {
GraphManager gm = managerCache.getGraphManager( appScope );
- final AtomicInteger entityFileCount = new AtomicInteger();
- final AtomicInteger connectionFileCount = new AtomicInteger();
+ final AtomicInteger entityFileCount = new AtomicInteger(); // entities are batched into files
+ final AtomicInteger connectionCount = new AtomicInteger();
+ final Map<String, AtomicInteger> collectionStats = new HashMap<>();
+ collectionStats.put(keyTotalEntityCount, new AtomicInteger());
+ final Map<String, Object> infoMap = new HashMap<>();
+ infoMap.put("application", appScope.getApplication().getUuid().toString());
+ infoMap.put("exportVersion", exportVersion);
+ infoMap.put("exportStarted", System.currentTimeMillis());
+
+ logger.info("Starting export of application: {}", appScope.getApplication().getUuid().toString());
allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
- .buffer( 1000 )
- .doOnNext( edgeScopes -> {
+ .buffer(500)
+ .map( edgeScopes -> {
+ List<Id> entityIds = new ArrayList<>();
+ edgeScopes.forEach( edgeScope -> {
+ if (edgeScope.getEdge().getTargetNode() != null) {
+ logger.debug("adding entity to list: {}", edgeScope.getEdge().getTargetNode());
+ entityIds.add(edgeScope.getEdge().getTargetNode());
+ }
+ });
- try {
+ return entityIds;
- final String filenameWithPath = "entities/" +
- "entities."+ entityFileCount.get() + ".json";
+ })
+ .flatMap( entityIds -> {
- logger.debug("adding zip entry: {}", filenameWithPath);
- zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+ logger.debug("entityIds: {}", entityIds);
- edgeScopes.forEach( edgeScope -> {
+ // batch load the entities
+ EntitySet entitySet = ecm.load(entityIds).toBlocking().lastOrDefault(new EntitySetImpl(0));
+
+ final String filenameWithPath = "entities/entities." + entityFileCount.get() + ".json";
+
+ try {
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
- try {
- // load the entity and convert to a normal map
- Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
- Map entityMap = CpEntityMapUtils.toMap(entity);
+ entitySet.getEntities().forEach(mvccEntity -> {
- if (entity != null) {
+ if (mvccEntity.getEntity().isPresent()) {
+ Map entityMap = CpEntityMapUtils.toMap(mvccEntity.getEntity().get());
+ try {
+ collectionStats.putIfAbsent(mvccEntity.getId().getType(), new AtomicInteger());
+ collectionStats.get(mvccEntity.getId().getType()).incrementAndGet();
+ collectionStats.get(keyTotalEntityCount).incrementAndGet();
- logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+ logger.debug("writing and flushing entity {} to zip stream for file: {}", mvccEntity.getId().getUuid().toString(), filenameWithPath);
zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
zipOutputStream.write("\n".getBytes());
+ zipOutputStream.flush(); // entities can be large, flush after each
- } else {
- logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString());
- }
+ } catch (IOException e) {
+ logger.warn("unable to write entry in zip stream for entityId: {}", mvccEntity.getId());
+ throw new RuntimeException("Unable to export data. Error writing to stream.");
- } catch (IOException e) {
- logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+ }
+ } else {
+ logger.warn("entityId {} did not have corresponding entity, not writing", mvccEntity.getId());
}
-
-
- entityFileCount.addAndGet(1);
-
});
zipOutputStream.closeEntry();
- zipOutputStream.flush();
+ entityFileCount.incrementAndGet();
- } catch (IOException e) {
- logger.warn("Unable to create entry in zip export for batch entities");
+ }catch (IOException e){
+ throw new RuntimeException("Unable to export data. Error writing to stream.");
}
- //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
- })
- .doOnNext( edgeScopes -> {
-
- try{
+ return Observable.from(entitySet.getEntities());
- final String filenameWithPath = "connections/" +
- "connections." + connectionFileCount.get() + ".json";
-
- logger.debug("adding zip entry: {}", filenameWithPath);
- zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+ })
+ .doOnNext( mvccEntity -> {
- edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+ gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(mvccEntity.getId()))
.flatMap(emittedEdgeType -> {
- logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
- return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(),
+ logger.debug("loading edges of type {} from node {}", emittedEdgeType, mvccEntity.getId());
+ return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(mvccEntity.getId(),
emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
})
+ .doOnNext(markedEdge -> {
- .buffer( 1000 )
- .doOnNext(markedEdges -> {
+ if (!markedEdge.isDeleted() && !markedEdge.isTargetNodeDeleted() && markedEdge.getTargetNode() != null ) {
+ // doing the load to just again make sure bad connections are not exported
+ Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+ if (entity != null) {
- markedEdges.forEach( markedEdge -> {
+ try {
+ // since a single stream is being written, and connecitons are loaded per entity,
+ // it cannot easily be batched eventlyinto files so write them separately
+ final String filenameWithPath = "connections/" +
+ markedEdge.getSourceNode().getUuid().toString()+"_" +
+ CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) + "_" +
+ markedEdge.getTargetNode().getUuid().toString() + ".json";
- if (!markedEdge.isDeleted()) {
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
- // doing the load to just again make sure bad connections are not exported
- Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
- if (entity != null) {
+ final Map<String,String> connectionMap = new HashMap<String,String>(1){{
+ put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
+ put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
+ put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
+ }};
- try {
+ logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
+ zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
- final Map<String,String> connectionMap = new HashMap<String,String>(1){{
- put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
- put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
- put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
- }};
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
- logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
- zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
- zipOutputStream.write("\n".getBytes());
+ connectionCount.incrementAndGet();
- } catch (IOException e) {
- logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
- }
- } else {
- logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
- }
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+ throw new RuntimeException("Unable to export data. Error writing to stream.");
}
+ } else {
+ logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
+ }
+ }
- });
-
-
+ }).toBlocking().lastOrDefault(null);
+ })
+ .doOnCompleted(() -> {
+ infoMap.put("exportFinished", System.currentTimeMillis());
- }).toBlocking().lastOrDefault(null));
- connectionFileCount.addAndGet(1);
+ try {
+ zipOutputStream.putNextEntry(new ZipEntry("metadata.json"));
+ zipOutputStream.write(jsonSerializer.toString(infoMap).getBytes());
zipOutputStream.closeEntry();
- zipOutputStream.flush();
+ zipOutputStream.putNextEntry(new ZipEntry("stats.json"));
+ Map<String, Object> stats = new HashMap<>();
+ stats.put("totalEntities", collectionStats.get(keyTotalEntityCount).get());
+ stats.put("totalConnections", connectionCount.get());
+ collectionStats.remove(keyTotalEntityCount);
+ stats.put("collectionCounts", new HashMap<String, Integer>(collectionStats.size()){{
+ collectionStats.forEach( (collection,count) -> {
+ put(InflectionUtils.pluralize(collection),count.get());
+ });
+ }});
+ zipOutputStream.write(jsonSerializer.toString(stats).getBytes());
+ zipOutputStream.closeEntry();
- } catch (IOException e) {
- logger.warn("Unable to create entry in zip export for batch connections");
- }
-
- })
- .doOnCompleted(() -> {
-
- //writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() );
- try {
logger.debug("closing zip stream");
zipOutputStream.close();
+ logger.info("Finished export of application: {}", appScope.getApplication().getUuid().toString());
+
+
} catch (IOException e) {
- logger.error( "unable to close zip stream");
+ throw new RuntimeException("Unable to export data due to inability to close zip stream.");
}
})
@@ -255,30 +276,6 @@ public class ExportServiceImpl implements ExportService {
return new ExportRequestBuilderImpl();
}
-
-
-
- /**
- * Write our state meta data into cassandra so everyone can see it
- * @param jobId
- * @param status
- * @param processedCount
- * @param lastUpdated
- */
- private void writeStateMeta( final String jobId, final Status status, final long processedCount,
- final long lastUpdated ) {
-
- if(logger.isDebugEnabled()) {
- logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
- jobId, status, processedCount, lastUpdated);
- }
-
- mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
- mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
- mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java
new file mode 100644
index 0000000..7412ef0
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+import com.google.inject.Injector;
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+
+import static org.junit.Assert.assertTrue;
+
+
+public class ExportServiceIT extends AbstractCoreIT {
+
+
+ @Test
+ public void testExport() throws Exception {
+
+ Injector injector = SpringResource.getInstance().getBean(Injector.class);
+
+ ExportService exportService = injector.getInstance(ExportService.class);
+
+ final EntityManager em = app.getEntityManager();
+
+ // create two types of entities
+
+ final String type1 = "type1thing";
+ final String type2 = "type2thing";
+ final int size = 1;
+
+ final Set<Id> type1Identities = EntityWriteHelper.createTypes( em, type1, size );
+ final Set<Id> type2Identities = EntityWriteHelper.createTypes( em, type2, size );
+
+ // connect the first type1 entity to all type2 entities
+
+ final Id source = type1Identities.iterator().next();
+ final Set<Id> connections = new HashSet<>();
+
+ for ( Id target : type2Identities ) {
+ em.createConnection( SimpleEntityRef.fromId( source ),
+ "likes", SimpleEntityRef.fromId( target ) );
+ connections.add( target );
+ }
+
+ ExportRequestBuilder builder = new ExportRequestBuilderImpl().withApplicationId(app.getId());
+
+ // fill the output stream
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ exportService.export(builder, stream);
+
+ // convert the output stream to an input stream and read it as a zip
+ InputStream inputStream = new ByteArrayInputStream(stream.toByteArray());
+ ZipInputStream zip = new ZipInputStream(inputStream);
+
+
+ boolean entityEntryExists = false;
+ boolean connectionEntryExists = false;
+ boolean statsEntryExists = false;
+ boolean metaEntryExists = false;
+
+ final String entityFile = "entities/entities.0.json";
+ final String connectionFile = "connections/"+source.getUuid().toString()+"_"+"likes"+"_"+connections.iterator().next().getUuid().toString()+".json";
+ final String statsFile = "stats.json";
+ final String metaFile = "metadata.json";
+
+ ZipEntry zipEntry;
+ while ( (zipEntry = zip.getNextEntry()) != null ) {
+
+ final String name = zipEntry.getName();
+
+ if (name.equals(entityFile)) {
+ entityEntryExists = true;
+ }
+ if(name.equals(connectionFile)){
+ connectionEntryExists = true;
+ }
+ if(name.equals(statsFile)){
+ statsEntryExists = true;
+ }
+ if(name.equals(metaFile)){
+ metaEntryExists = true;
+ }
+
+ }
+
+ assertTrue("Expected zip entries are missing", entityEntryExists && connectionEntryExists && statsEntryExists && metaEntryExists);
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 54702a8..438f731 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -146,11 +146,13 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
for ( final Id entityId : entityIds ) {
- final ScopedRowKey<Id> rowKey =
- ScopedRowKey.fromKey( applicationId, entityId );
+ if ( entityId != null ) { // the size of entityIds is checked as preconditions, but the values are not
+ final ScopedRowKey<Id> rowKey =
+ ScopedRowKey.fromKey(applicationId, entityId);
- rowKeys.add( rowKey );
+ rowKeys.add(rowKey);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
index 807e0c9..8319ff5 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
@@ -71,8 +71,7 @@ public abstract class AbstractExceptionMapper<E extends java.lang.Throwable> imp
if ( status >= 500 ) {
// only log real errors as errors
- logger.error( "{} 5XX Uncaught Exception ({}), {}", e.getClass().getCanonicalName(), status, e );
-
+ logger.error( "{} 5XX Uncaught Exception ({})", e.getClass().getCanonicalName(), status, e );
} else {
if (logger.isDebugEnabled()) {
logger.debug( "{} Following Exception Thrown ({}), {}", e.getClass().getCanonicalName(), status, e );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
index a12e484..7411ea4 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
@@ -66,8 +66,8 @@ public class OrganizationResource extends AbstractContextResource {
private static final Logger logger = LoggerFactory.getLogger( OrganizationsResource.class );
- @Autowired
- protected ExportService exportService;
+ //@Autowired
+ //protected ExportService exportService;
OrganizationInfo organization;
@@ -281,99 +281,99 @@ public class OrganizationResource extends AbstractContextResource {
return response;
}
- @POST
- @Path("export")
- @Consumes(APPLICATION_JSON)
- @RequireOrganizationAccess
- public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
- @QueryParam("callback") @DefaultValue("") String callback )
- throws OAuthSystemException {
-
- if (logger.isTraceEnabled()) {
- logger.trace("executePostJson");
- }
-
- Map<String, String> uuidRet = new HashMap<>();
-
- try {
- Object propertiesObj = json.get("properties");
- if (propertiesObj == null) {
- throw new NullArgumentException("Could not find 'properties'");
- }
- if (!(propertiesObj instanceof Map)) {
- throw new IllegalArgumentException("'properties' not a map");
- }
-
- @SuppressWarnings("unchecked")
- Map<String,Object> properties = (Map<String,Object>)propertiesObj;
-
- String storage_provider = ( String ) properties.get( "storage_provider" );
- if(storage_provider == null) {
- throw new NullArgumentException( "Could not find field 'storage_provider'" );
- }
-
- Object storageInfoObj = properties.get("storage_info");
- if(storageInfoObj == null) {
- throw new NullArgumentException( "Could not find field 'storage_info'" );
- }
- @SuppressWarnings("unchecked")
- Map<String,Object> storage_info = (Map<String, Object>)storageInfoObj;
-
- String bucketName = ( String ) storage_info.get( "bucket_location" );
- String accessId = ( String ) storage_info.get( "s3_access_id" );
- String secretKey = ( String ) storage_info.get( "s3_key" );
-
- if ( bucketName == null ) {
- throw new NullArgumentException( "Could not find field 'bucketName'" );
- }
- if ( accessId == null ) {
- throw new NullArgumentException( "Could not find field 's3_access_id'" );
- }
- if ( secretKey == null ) {
-
- throw new NullArgumentException( "Could not find field 's3_key'" );
- }
-
- json.put( "organizationId",organization.getUuid());
-
- UUID jobUUID = exportService.schedule( json );
- uuidRet.put( "Export Entity", jobUUID.toString() );
- }
- catch ( NullArgumentException e ) {
- return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
- }
- catch ( Exception e ) {
- //TODO:throw descriptive error message and or include on in the response
- //TODO:fix below, it doesn't work if there is an exception. Make it look like the OauthResponse.
- return Response.status( SC_INTERNAL_SERVER_ERROR ).type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
- }
- return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
- }
-
- @GET
- @RequireOrganizationAccess
- @Path("export/{exportEntity: [A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}}")
- public Response exportGetJson( @Context UriInfo ui, @PathParam("exportEntity") UUID exportEntityUUIDStr,
- @QueryParam("callback") @DefaultValue("") String callback ) throws Exception {
-
- Export entity;
- try {
- entity = smf.getServiceManager( emf.getManagementAppId() ).getEntityManager()
- .get( exportEntityUUIDStr, Export.class );
- }
- catch ( Exception e ) { //this might not be a bad request and needs better error checking
- return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
- }
-
- if ( entity == null ) {
- return Response.status( SC_BAD_REQUEST ).build();
- }
-
- return Response.status( SC_OK ).entity( entity).build();
- }
+// @POST
+// @Path("export")
+// @Consumes(APPLICATION_JSON)
+// @RequireOrganizationAccess
+// public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
+// @QueryParam("callback") @DefaultValue("") String callback )
+// throws OAuthSystemException {
+//
+// if (logger.isTraceEnabled()) {
+// logger.trace("executePostJson");
+// }
+//
+// Map<String, String> uuidRet = new HashMap<>();
+//
+// try {
+// Object propertiesObj = json.get("properties");
+// if (propertiesObj == null) {
+// throw new NullArgumentException("Could not find 'properties'");
+// }
+// if (!(propertiesObj instanceof Map)) {
+// throw new IllegalArgumentException("'properties' not a map");
+// }
+//
+// @SuppressWarnings("unchecked")
+// Map<String,Object> properties = (Map<String,Object>)propertiesObj;
+//
+// String storage_provider = ( String ) properties.get( "storage_provider" );
+// if(storage_provider == null) {
+// throw new NullArgumentException( "Could not find field 'storage_provider'" );
+// }
+//
+// Object storageInfoObj = properties.get("storage_info");
+// if(storageInfoObj == null) {
+// throw new NullArgumentException( "Could not find field 'storage_info'" );
+// }
+// @SuppressWarnings("unchecked")
+// Map<String,Object> storage_info = (Map<String, Object>)storageInfoObj;
+//
+// String bucketName = ( String ) storage_info.get( "bucket_location" );
+// String accessId = ( String ) storage_info.get( "s3_access_id" );
+// String secretKey = ( String ) storage_info.get( "s3_key" );
+//
+// if ( bucketName == null ) {
+// throw new NullArgumentException( "Could not find field 'bucketName'" );
+// }
+// if ( accessId == null ) {
+// throw new NullArgumentException( "Could not find field 's3_access_id'" );
+// }
+// if ( secretKey == null ) {
+//
+// throw new NullArgumentException( "Could not find field 's3_key'" );
+// }
+//
+// json.put( "organizationId",organization.getUuid());
+//
+// UUID jobUUID = exportService.schedule( json );
+// uuidRet.put( "Export Entity", jobUUID.toString() );
+// }
+// catch ( NullArgumentException e ) {
+// return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+// }
+// catch ( Exception e ) {
+// //TODO:throw descriptive error message and or include on in the response
+// //TODO:fix below, it doesn't work if there is an exception. Make it look like the OauthResponse.
+// return Response.status( SC_INTERNAL_SERVER_ERROR ).type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+// }
+// return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
+// }
+
+// @GET
+// @RequireOrganizationAccess
+// @Path("export/{exportEntity: [A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}}")
+// public Response exportGetJson( @Context UriInfo ui, @PathParam("exportEntity") UUID exportEntityUUIDStr,
+// @QueryParam("callback") @DefaultValue("") String callback ) throws Exception {
+//
+// Export entity;
+// try {
+// entity = smf.getServiceManager( emf.getManagementAppId() ).getEntityManager()
+// .get( exportEntityUUIDStr, Export.class );
+// }
+// catch ( Exception e ) { //this might not be a bad request and needs better error checking
+// return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+// }
+//
+// if ( entity == null ) {
+// return Response.status( SC_BAD_REQUEST ).build();
+// }
+//
+// return Response.status( SC_OK ).entity( entity).build();
+// }
protected Set<String> getSetFromCommaSeparatedString(String input) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
index a8ed8dc..79973c3 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
@@ -74,8 +74,8 @@ public class ApplicationResource extends AbstractContextResource {
public static final String CONFIRM_APPLICATION_IDENTIFIER = "confirm_application_identifier";
- @Autowired
- protected ExportService exportService;
+ //@Autowired
+ //protected ExportService exportService;
OrganizationInfo organization;
UUID applicationId;
@@ -267,161 +267,161 @@ public class ApplicationResource extends AbstractContextResource {
return response;
}
- @POST
- @Path("export")
- @Consumes(APPLICATION_JSON)
- @RequireOrganizationAccess
- public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
- @QueryParam("callback") @DefaultValue("") String callback )
- throws OAuthSystemException {
-
- UsergridAwsCredentials uac = new UsergridAwsCredentials();
-
- UUID jobUUID = null;
- Map<String, String> uuidRet = new HashMap<String, String>();
-
- Map<String,Object> properties;
- Map<String, Object> storage_info;
-
- try {
- if((properties = ( Map<String, Object> ) json.get( "properties" )) == null){
- throw new NullArgumentException("Could not find 'properties'");
- }
- storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
- String storage_provider = ( String ) properties.get( "storage_provider" );
- if(storage_provider == null) {
- throw new NullArgumentException( "Could not find field 'storage_provider'" );
- }
- if(storage_info == null) {
- throw new NullArgumentException( "Could not find field 'storage_info'" );
- }
-
-
- String bucketName = ( String ) storage_info.get( "bucket_location" );
- String accessId = ( String ) storage_info.get( "s3_access_id" );
- String secretKey = ( String ) storage_info.get( "s3_key" );
-
- if ( bucketName == null ) {
- throw new NullArgumentException( "Could not find field 'bucketName'" );
- }
- if ( accessId == null ) {
- throw new NullArgumentException( "Could not find field 's3_access_id'" );
- }
- if ( secretKey == null ) {
-
- throw new NullArgumentException( "Could not find field 's3_key'" );
- }
-
- json.put("organizationId", organization.getUuid());
- json.put( "applicationId",applicationId);
-
- jobUUID = exportService.schedule( json );
- uuidRet.put( "Export Entity", jobUUID.toString() );
- }
- catch ( NullArgumentException e ) {
- return Response.status( SC_BAD_REQUEST )
- .type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
- }
- catch ( Exception e ) {
- // TODO: throw descriptive error message and or include on in the response
- // TODO: fix below, it doesn't work if there is an exception.
- // Make it look like the OauthResponse.
- return Response.status( SC_INTERNAL_SERVER_ERROR )
- .type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
- }
-
- return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
- }
-
- @POST
- @Path("collection/{collection_name}/export")
- @Consumes(APPLICATION_JSON)
- @RequireOrganizationAccess
- public Response exportPostJson( @Context UriInfo ui,
- @PathParam( "collection_name" ) String collection_name ,Map<String, Object> json,
- @QueryParam("callback") @DefaultValue("") String callback )
- throws OAuthSystemException {
-
- UsergridAwsCredentials uac = new UsergridAwsCredentials();
- UUID jobUUID = null;
- String colExport = collection_name;
- Map<String, String> uuidRet = new HashMap<String, String>();
-
- Map<String,Object> properties;
- Map<String, Object> storage_info;
-
- try {
- //checkJsonExportProperties(json);
- if((properties = ( Map<String, Object> ) json.get( "properties" )) == null){
- throw new NullArgumentException("Could not find 'properties'");
- }
- storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
- String storage_provider = ( String ) properties.get( "storage_provider" );
- if(storage_provider == null) {
- throw new NullArgumentException( "Could not find field 'storage_provider'" );
- }
- if(storage_info == null) {
- throw new NullArgumentException( "Could not find field 'storage_info'" );
- }
-
- String bucketName = ( String ) storage_info.get( "bucket_location" );
- String accessId = ( String ) storage_info.get( "s3_access_id" );
- String secretKey = ( String ) storage_info.get( "s3_key" );
-
- if ( accessId == null ) {
- throw new NullArgumentException( "Could not find field 's3_access_id'" );
- }
- if ( secretKey == null ) {
- throw new NullArgumentException( "Could not find field 's3_key'" );
- }
-
- if(bucketName == null) {
- throw new NullArgumentException( "Could not find field 'bucketName'" );
- }
-
- json.put( "organizationId",organization.getUuid() );
- json.put( "applicationId", applicationId);
- json.put( "collectionName", colExport);
-
- jobUUID = exportService.schedule( json );
- uuidRet.put( "Export Entity", jobUUID.toString() );
- }
- catch ( NullArgumentException e ) {
- return Response.status( SC_BAD_REQUEST )
- .type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) )
- .build();
- }
- catch ( Exception e ) {
-
- // TODO: throw descriptive error message and or include on in the response
- // TODO: fix below, it doesn't work if there is an exception.
- // Make it look like the OauthResponse.
-
- OAuthResponse errorMsg = OAuthResponse.errorResponse( SC_INTERNAL_SERVER_ERROR )
- .setErrorDescription( e.getMessage() )
- .buildJSONMessage();
-
- return Response.status( errorMsg.getResponseStatus() )
- .type( JSONPUtils.jsonMediaType( callback ) )
- .entity( ServiceResource.wrapWithCallback( errorMsg.getBody(), callback ) )
- .build();
- }
-
- return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
- }
-
-
- @Path( "imports" )
- public ImportsResource importGetJson( @Context UriInfo ui,
- @QueryParam( "callback" ) @DefaultValue( "" ) String callback )
- throws Exception {
-
-
- return getSubResource( ImportsResource.class ).init( organization, application );
- }
+// @POST
+// @Path("export")
+// @Consumes(APPLICATION_JSON)
+// @RequireOrganizationAccess
+// public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
+// @QueryParam("callback") @DefaultValue("") String callback )
+// throws OAuthSystemException {
+//
+// UsergridAwsCredentials uac = new UsergridAwsCredentials();
+//
+// UUID jobUUID = null;
+// Map<String, String> uuidRet = new HashMap<String, String>();
+//
+// Map<String,Object> properties;
+// Map<String, Object> storage_info;
+//
+// try {
+// if((properties = ( Map<String, Object> ) json.get( "properties" )) == null){
+// throw new NullArgumentException("Could not find 'properties'");
+// }
+// storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
+// String storage_provider = ( String ) properties.get( "storage_provider" );
+// if(storage_provider == null) {
+// throw new NullArgumentException( "Could not find field 'storage_provider'" );
+// }
+// if(storage_info == null) {
+// throw new NullArgumentException( "Could not find field 'storage_info'" );
+// }
+//
+//
+// String bucketName = ( String ) storage_info.get( "bucket_location" );
+// String accessId = ( String ) storage_info.get( "s3_access_id" );
+// String secretKey = ( String ) storage_info.get( "s3_key" );
+//
+// if ( bucketName == null ) {
+// throw new NullArgumentException( "Could not find field 'bucketName'" );
+// }
+// if ( accessId == null ) {
+// throw new NullArgumentException( "Could not find field 's3_access_id'" );
+// }
+// if ( secretKey == null ) {
+//
+// throw new NullArgumentException( "Could not find field 's3_key'" );
+// }
+//
+// json.put("organizationId", organization.getUuid());
+// json.put( "applicationId",applicationId);
+//
+// jobUUID = exportService.schedule( json );
+// uuidRet.put( "Export Entity", jobUUID.toString() );
+// }
+// catch ( NullArgumentException e ) {
+// return Response.status( SC_BAD_REQUEST )
+// .type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+// }
+// catch ( Exception e ) {
+// // TODO: throw descriptive error message and or include on in the response
+// // TODO: fix below, it doesn't work if there is an exception.
+// // Make it look like the OauthResponse.
+// return Response.status( SC_INTERNAL_SERVER_ERROR )
+// .type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+// }
+//
+// return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
+// }
+//
+// @POST
+// @Path("collection/{collection_name}/export")
+// @Consumes(APPLICATION_JSON)
+// @RequireOrganizationAccess
+// public Response exportPostJson( @Context UriInfo ui,
+// @PathParam( "collection_name" ) String collection_name ,Map<String, Object> json,
+// @QueryParam("callback") @DefaultValue("") String callback )
+// throws OAuthSystemException {
+//
+// UsergridAwsCredentials uac = new UsergridAwsCredentials();
+// UUID jobUUID = null;
+// String colExport = collection_name;
+// Map<String, String> uuidRet = new HashMap<String, String>();
+//
+// Map<String,Object> properties;
+// Map<String, Object> storage_info;
+//
+// try {
+// //checkJsonExportProperties(json);
+// if((properties = ( Map<String, Object> ) json.get( "properties" )) == null){
+// throw new NullArgumentException("Could not find 'properties'");
+// }
+// storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
+// String storage_provider = ( String ) properties.get( "storage_provider" );
+// if(storage_provider == null) {
+// throw new NullArgumentException( "Could not find field 'storage_provider'" );
+// }
+// if(storage_info == null) {
+// throw new NullArgumentException( "Could not find field 'storage_info'" );
+// }
+//
+// String bucketName = ( String ) storage_info.get( "bucket_location" );
+// String accessId = ( String ) storage_info.get( "s3_access_id" );
+// String secretKey = ( String ) storage_info.get( "s3_key" );
+//
+// if ( accessId == null ) {
+// throw new NullArgumentException( "Could not find field 's3_access_id'" );
+// }
+// if ( secretKey == null ) {
+// throw new NullArgumentException( "Could not find field 's3_key'" );
+// }
+//
+// if(bucketName == null) {
+// throw new NullArgumentException( "Could not find field 'bucketName'" );
+// }
+//
+// json.put( "organizationId",organization.getUuid() );
+// json.put( "applicationId", applicationId);
+// json.put( "collectionName", colExport);
+//
+// jobUUID = exportService.schedule( json );
+// uuidRet.put( "Export Entity", jobUUID.toString() );
+// }
+// catch ( NullArgumentException e ) {
+// return Response.status( SC_BAD_REQUEST )
+// .type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) )
+// .build();
+// }
+// catch ( Exception e ) {
+//
+// // TODO: throw descriptive error message and or include on in the response
+// // TODO: fix below, it doesn't work if there is an exception.
+// // Make it look like the OauthResponse.
+//
+// OAuthResponse errorMsg = OAuthResponse.errorResponse( SC_INTERNAL_SERVER_ERROR )
+// .setErrorDescription( e.getMessage() )
+// .buildJSONMessage();
+//
+// return Response.status( errorMsg.getResponseStatus() )
+// .type( JSONPUtils.jsonMediaType( callback ) )
+// .entity( ServiceResource.wrapWithCallback( errorMsg.getBody(), callback ) )
+// .build();
+// }
+//
+// return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
+// }
+//
+//
+// @Path( "imports" )
+// public ImportsResource importGetJson( @Context UriInfo ui,
+// @QueryParam( "callback" ) @DefaultValue( "" ) String callback )
+// throws Exception {
+//
+//
+// return getSubResource( ImportsResource.class ).init( organization, application );
+// }
@GET
@Path("/status")
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index b43594a..d8c314e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -134,7 +134,14 @@ public class NotificationsService extends AbstractCollectionService {
postMeter.mark();
try {
- validate(null, context.getPayload());
+ validatePayload(null, context.getPayload());
+
+ final Object deliverValue = context.getProperties().get("deliver");
+ if ( deliverValue != null && deliverValue instanceof Number && (long)context.getProperties().get("deliver") < System.currentTimeMillis() ){
+ throw new IllegalArgumentException("Property 'deliver' cannot have a value in the past. It is expected to be a unix timestamp in milliseconds");
+ } else if ( deliverValue != null && !(deliverValue instanceof Number) ){
+ throw new IllegalArgumentException("Property 'deliver' must be a number. It is expected to be a unix timestamp in milliseconds");
+ }
// perform some input validates on useGraph payload property vs. ql= path query
final List<ServiceParameter> parameters = context.getRequest().getOriginalParameters();
@@ -227,7 +234,7 @@ public class NotificationsService extends AbstractCollectionService {
public Entity updateEntity(ServiceRequest request, EntityRef ref,
ServicePayload payload) throws Exception {
- validate(ref, payload);
+ validatePayload(ref, payload);
Notification notification = em.get(ref, Notification.class);
@@ -273,7 +280,7 @@ public class NotificationsService extends AbstractCollectionService {
}
// validate payloads
- private void validate(EntityRef ref, ServicePayload servicePayload)
+ private void validatePayload(EntityRef ref, ServicePayload servicePayload)
throws Exception {
Object obj_payloads = servicePayload.getProperty("payloads");
if (obj_payloads == null && ref == null) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index bcfd654..2a757ca 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -33,12 +33,8 @@ import java.util.*;
import org.apache.usergrid.services.ServiceAction;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
import static org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
+import static org.junit.Assert.*;
@NotThreadSafe
@@ -809,6 +805,49 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
}
}
+ @Test
+ public void pushNotificationBadDeliverProperty() throws Exception {
+
+ // try with timestamp in the past
+ app.clear();
+ String payload = getPayload();
+ Map<String, String> payloads = new HashMap<String, String>(1);
+ payloads.put(notifier.getName(), payload);
+ app.put("payloads", payloads);
+ app.put("deliver", System.currentTimeMillis() - 1000); // some time in the past
+ app.put("debug",true);
+
+ // post notification to
+ try {
+ app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications").getEntity();
+ } catch (Exception e) {
+ // should be bad request due to time in the past
+ assertEquals(e.getClass(), IllegalArgumentException.class);
+ assertTrue(e.getMessage().contains("Property 'deliver' cannot have a value in the past"));
+ }
+
+
+ // try with invalid type of a string for deliver, should be a number
+ app.clear();
+ payload = getPayload();
+ payloads = new HashMap<String, String>(1);
+ payloads.put(notifier.getName(), payload);
+ app.put("payloads", payloads);
+ app.put("deliver", "notatime"); // some time in the past
+ app.put("debug",true);
+
+ // post notification
+ try {
+ app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications").getEntity();
+ } catch (Exception e) {
+ // should be bad request due invalid deliver value
+ assertEquals(e.getClass(), IllegalArgumentException.class);
+ assertTrue(e.getMessage().contains("Property 'deliver' must be a number"));
+ }
+
+
+ }
+
private String getPayload(){
ApnsPayloadBuilder builder = new ApnsPayloadBuilder();
builder.setAlertBody("Hello, World!");
[4/4] usergrid git commit: Fix issue where duplicate receipts were
created when a notification failed.
Posted by mr...@apache.org.
Fix issue where duplicate receipts were created when a notification failed.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b63aae7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b63aae7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b63aae7
Branch: refs/heads/master
Commit: 8b63aae7def01cbf4f83e13bc5ef1b237344174b
Parents: b37fc81
Author: Michael Russo <ru...@google.com>
Authored: Fri Mar 31 00:40:39 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Fri Mar 31 00:40:39 2017 -0700
----------------------------------------------------------------------
.../org/apache/usergrid/services/notifications/TaskManager.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b63aae7/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 870cae9..678b88a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -112,7 +112,6 @@ public class TaskManager {
this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID ), receipt, true );
}
- completed(notifier, deviceUUID);
finishedBatch();
} catch (Exception e){