You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/10 00:04:33 UTC

[13/16] Refactored loader to group for batching, then call filters to perform the load, verification and result construction

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
new file mode 100644
index 0000000..feed396
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+
+
+package org.apache.usergrid.corepersistence.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.BooleanField;
+import org.apache.usergrid.persistence.model.field.ByteArrayField;
+import org.apache.usergrid.persistence.model.field.DoubleField;
+import org.apache.usergrid.persistence.model.field.EntityObjectField;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.FloatField;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.ListField;
+import org.apache.usergrid.persistence.model.field.LocationField;
+import org.apache.usergrid.persistence.model.field.LongField;
+import org.apache.usergrid.persistence.model.field.SetField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
+import org.apache.usergrid.persistence.model.field.value.EntityObject;
+import org.apache.usergrid.persistence.model.field.value.Location;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utilities for converting entities to/from maps suitable for Core Persistence.
+ * Aware of unique properties via Schema.
+ */
+public class CpEntityMapUtils {
+    private static final Logger logger = LoggerFactory.getLogger( CpEntityMapUtils.class );
+
+    public static ObjectMapper objectMapper = new ObjectMapper(  );
+
+    public static Entity fromMap( Map<String, Object> map, String entityType, boolean topLevel ) {
+        return fromMap( null, map, entityType, topLevel );
+    }
+
+    public static Entity fromMap( Entity entity, Map<String, Object> map, String entityType, boolean topLevel ) {
+
+        if ( entity == null ) {
+            entity = new Entity();
+        }
+
+        for ( String fieldName : map.keySet() ) {
+
+            Object value = map.get( fieldName );
+            boolean unique = Schema.getDefaultSchema().isPropertyUnique(entityType, fieldName);
+
+//            if ( unique ) {
+//                logger.debug("{} is a unique property", fieldName );
+//            }
+
+            if ( value instanceof String ) {
+                entity.setField( new StringField( fieldName, (String)value, unique && topLevel ));
+
+            } else if ( value instanceof Boolean ) {
+                entity.setField( new BooleanField( fieldName, (Boolean)value, unique && topLevel ));
+                        
+            } else if ( value instanceof Integer ) {
+                entity.setField( new IntegerField( fieldName, (Integer)value, unique && topLevel ));
+
+            } else if ( value instanceof Double ) {
+                entity.setField( new DoubleField( fieldName, (Double)value, unique && topLevel ));
+
+		    } else if ( value instanceof Float ) {
+                entity.setField( new FloatField( fieldName, (Float)value, unique && topLevel ));
+				
+            } else if ( value instanceof Long ) {
+                entity.setField( new LongField( fieldName, (Long)value, unique && topLevel ));
+
+            } else if ( value instanceof List) {
+                entity.setField( listToListField( fieldName, (List)value, entityType ));  
+            
+            } else if ( value instanceof UUID) {
+                entity.setField( new UUIDField( fieldName, (UUID)value, unique && topLevel ));
+
+            } else if ( value instanceof Map ) {
+                processMapValue( value, fieldName, entity, entityType);
+
+            } else if ( value instanceof Enum ) {
+                entity.setField( new StringField( fieldName, value.toString(), unique && topLevel ));
+	
+			} else if ( value != null ) {
+                byte[] valueSerialized;
+                try {
+                    valueSerialized = objectMapper.writeValueAsBytes( value );
+                }
+                catch ( JsonProcessingException e ) {
+                    throw new RuntimeException( "Can't serialize object ",e );
+                }
+                catch ( IOException e ) {
+                    throw new RuntimeException( "Can't serialize object ",e );
+                }
+                ByteBuffer byteBuffer = ByteBuffer.wrap( valueSerialized );
+                ByteArrayField bf = new ByteArrayField( fieldName, byteBuffer.array(), value.getClass() );
+                entity.setField( bf );
+            }
+        }
+
+        return entity;
+    }
+
+    private static void processMapValue(
+            Object value, String fieldName, Entity entity, String entityType) {
+
+        Field field = null;
+
+        // is the map really a location element?
+        Map<String, Object> m = (Map<String, Object>)value;
+        if ( m.size() == 2) {
+            Double lat = null;
+            Double lon = null;
+            try {
+                if ( m.get("latitude") != null && m.get("longitude") != null ) {
+                    lat = Double.parseDouble( m.get("latitude").toString() );
+                    lon = Double.parseDouble( m.get("longitude").toString() );
+                    
+                } else if ( m.get("lat") != null && m.get("lon") != null ) {
+                    lat = Double.parseDouble( m.get("lat").toString() );
+                    lon = Double.parseDouble( m.get("lon").toString() );
+                }
+            } catch ( NumberFormatException ignored ) {}
+            
+            if ( lat != null && lon != null ) {
+                field = new LocationField( fieldName, new Location( lat, lon ));
+            }
+        }
+        
+        if ( field == null ) {
+            
+            // not a location element, process it as map
+            entity.setField( new EntityObjectField( fieldName,
+                    fromMap( (Map<String, Object>)value, entityType, false ))); // recursion
+            
+        } else {
+            entity.setField( field );
+        }
+    }
+
+    
+    private static ListField listToListField( String fieldName, List list, String entityType ) {
+
+        if (list.isEmpty()) {
+            return new ListField( fieldName );
+        }
+
+        Object sample = list.get(0);
+
+        if ( sample instanceof Map ) {
+            return new ListField<Entity>( fieldName, processListForField( list, entityType ));
+
+        } else if ( sample instanceof List ) {
+            return new ListField<List>( fieldName, processListForField( list, entityType ));
+            
+        } else if ( sample instanceof String ) {
+            return new ListField<String>( fieldName, (List<String>)list );
+                    
+        } else if ( sample instanceof Boolean ) {
+            return new ListField<Boolean>( fieldName, (List<Boolean>)list );
+                    
+        } else if ( sample instanceof Integer ) {
+            return new ListField<Integer>( fieldName, (List<Integer>)list );
+
+        } else if ( sample instanceof Double ) {
+            return new ListField<Double>( fieldName, (List<Double>)list );
+
+        } else if ( sample instanceof Long ) {
+            return new ListField<Long>( fieldName, (List<Long>)list );
+
+        } else {
+            throw new RuntimeException("Unknown type " + sample.getClass().getName());
+        }
+    }
+
+    
+    private static List processListForField( List list, String entityType ) {
+        if ( list.isEmpty() ) {
+            return list;
+        }
+        Object sample = list.get(0);
+
+        if ( sample instanceof Map ) {
+            List<Entity> newList = new ArrayList<Entity>();
+            for ( Map<String, Object> map : (List<Map<String, Object>>)list ) {
+                newList.add( fromMap( map, entityType, false ) );
+            }
+            return newList;
+
+        } else if ( sample instanceof List ) {
+            return processListForField( list, entityType ); // recursion
+            
+        } else { 
+            return list;
+        } 
+    }
+
+
+    /**
+     * Convert Entity to Map, adding version_ug_field and a {name}_ug_analyzed field for each
+     * StringField.
+     */
+    public static Map toMap(EntityObject entity) {
+
+        Map<String, Object> entityMap = new TreeMap<>();
+
+        for (Object f : entity.getFields().toArray()) {
+            Field field = (Field) f;
+
+            if (f instanceof ListField || f instanceof ArrayField) {
+                List list = (List) field.getValue();
+                entityMap.put(field.getName(),
+                        new ArrayList( processCollectionForMap(list)));
+
+            } else if (f instanceof SetField) {
+                Set set = (Set) field.getValue();
+                entityMap.put(field.getName(),
+                        new ArrayList( processCollectionForMap(set)));
+
+            } else if (f instanceof EntityObjectField) {
+                EntityObject eo = (EntityObject) field.getValue();
+                entityMap.put( field.getName(), toMap(eo)); // recursion
+
+            } else if (f instanceof StringField) {
+                entityMap.put(field.getName(), ((String) field.getValue()));
+
+            } else if (f instanceof LocationField) {
+                LocationField locField = (LocationField) f;
+                Map<String, Object> locMap = new HashMap<String, Object>();
+
+                // field names lat and lon trigger ElasticSearch geo location 
+                locMap.put("lat", locField.getValue().getLatitude());
+                locMap.put("lon", locField.getValue().getLongitude());
+                 entityMap.put( field.getName(), field.getValue());
+
+            } else if (f instanceof ByteArrayField) {
+                    ByteArrayField bf = ( ByteArrayField ) f;
+
+                    byte[] serilizedObj =  bf.getValue();
+                    Object o;
+                    try {
+                        o = objectMapper.readValue( serilizedObj, bf.getClassinfo() );
+                    }
+                    catch ( IOException e ) {
+                        throw new RuntimeException( "Can't deserialize object ",e );
+                    }
+                    entityMap.put( bf.getName(), o );
+            }
+            else {
+                entityMap.put( field.getName(), field.getValue());
+            }
+        }
+
+        return entityMap;
+    }
+
+    
+    private static Collection processCollectionForMap(Collection c) {
+        if (c.isEmpty()) {
+            return c;
+        }
+        List processed = new ArrayList();
+        Object sample = c.iterator().next();
+
+        if (sample instanceof Entity) {
+            for (Object o : c.toArray()) {
+                Entity e = (Entity) o;
+                processed.add(toMap(e));
+            }
+
+        } else if (sample instanceof List) {
+            for (Object o : c.toArray()) {
+                List list = (List) o;
+                processed.add(processCollectionForMap(list)); // recursion;
+            }
+
+        } else if (sample instanceof Set) {
+            for (Object o : c.toArray()) {
+                Set set = (Set) o;
+                processed.add(processCollectionForMap(set)); // recursion;
+            }
+
+        } else {
+            for (Object o : c.toArray()) {
+                processed.add(o);
+            }
+        }
+        return processed;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
new file mode 100644
index 0000000..f2fce47
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -0,0 +1,106 @@
+package org.apache.usergrid.corepersistence.util;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Schema;
+
+
+/**
+ * Utilises for constructing standard naming conventions for collections and connections
+ */
+public class CpNamingUtils {
+
+    /**
+     * Edge types for all types
+     */
+    public static final String ALL_TYPES = "zzzalltypeszzz";
+
+    /**
+     * Edge types for collection suffix
+     */
+    public static final String EDGE_COLL_SUFFIX = "zzzcollzzz";
+
+    /**
+     * Edge types for connection suffix
+     */
+    public static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
+
+
+    public static String getCollectionScopeNameFromEntityType( String type ) {
+        String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
+        return csn.toLowerCase();
+    }
+
+
+    public static String getCollectionScopeNameFromCollectionName( String name ) {
+        String csn = EDGE_COLL_SUFFIX + name;
+        return csn.toLowerCase();
+    }
+
+
+    public static String getConnectionScopeName( String entityType, String connectionType ) {
+        String csn = EDGE_CONN_SUFFIX + connectionType + entityType;
+        return csn.toLowerCase();
+    }
+
+
+    public static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( EDGE_COLL_SUFFIX );
+    }
+
+
+    public  static boolean isConnectionEdgeType( String type ) {
+        return type.startsWith( EDGE_CONN_SUFFIX );
+    }
+
+
+    static public String getConnectionType( String edgeType ) {
+        String[] parts = edgeType.split( "\\|" );
+        return parts[1];
+    }
+
+
+    static public String getCollectionName( String edgeType ) {
+        String[] parts = edgeType.split( "\\|" );
+        return parts[1];
+    }
+
+
+    public static String getEdgeTypeFromConnectionType( String connectionType ) {
+
+        if ( connectionType != null ) {
+            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
+            return csn;
+        }
+
+        return null;
+    }
+
+
+    public static String getEdgeTypeFromCollectionName( String collectionName ) {
+
+        if ( collectionName != null ) {
+            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
+            return csn;
+        }
+
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
index 87f6b46..a75b9fd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.ListField;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index c5d5782..5a1eca2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;