You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:22 UTC
[15/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
new file mode 100755
index 0000000..bed61af
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericMetricShadowEntity;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * This object should be regarded as read-only metadata for an entity as it will be shared across all entity object
+ * with the same entity name, so don't try to set different values for any of the fields,
+ * otherwise it's not thread safe
+ */
+public class EntityDefinition implements Writable{
+ private final static Logger LOG = LoggerFactory.getLogger(EntityDefinition.class);
+
+ private Class<? extends TaggedLogAPIEntity> entityClass;
+ private String table;
+ private String columnFamily;
+ // TODO prefix be within search/get condition instead of entity definition. Topology entity should have pre-defined prefix.
+ private String prefix;
+ private String service;
+ private String serviceCreationPath;
+ private String serviceDeletionPath;
+ private String[] partitions;
+ private Map<String, Qualifier> displayNameMap = new HashMap<String, Qualifier>();
+ private Map<String, Qualifier> qualifierNameMap = new HashMap<String, Qualifier>();
+ private Map<String, Method> qualifierGetterMap = new HashMap<String, Method>();
+ private boolean isTimeSeries;
+ private MetricDefinition metricDefinition;
+ private IndexDefinition[] indexes;
+
+
+ public EntityDefinition(){}
+
+ public MetricDefinition getMetricDefinition() {
+ return metricDefinition;
+ }
+ public void setMetricDefinition(MetricDefinition metricDefinition) {
+ this.metricDefinition = metricDefinition;
+ }
+ public boolean isTimeSeries() {
+ return isTimeSeries;
+ }
+ public void setTimeSeries(boolean isTimeSeries) {
+ this.isTimeSeries = isTimeSeries;
+ }
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+ public void setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+ public Class<? extends TaggedLogAPIEntity> getEntityClass() {
+ return entityClass;
+ }
+ public void setEntityClass(Class<? extends TaggedLogAPIEntity> entityClass) {
+ this.entityClass = entityClass;
+ }
+ public String getTable() {
+ return table;
+ }
+ public void setTable(String table) {
+ this.table = table;
+ }
+ public Map<String, Qualifier> getDisplayNameMap() {
+ return displayNameMap;
+ }
+ public void setDisplayNameMap(Map<String, Qualifier> displayNameMap) {
+ this.displayNameMap = displayNameMap;
+ }
+ public Map<String, Qualifier> getQualifierNameMap() {
+ return qualifierNameMap;
+ }
+ public void setQualifierNameMap(Map<String, Qualifier> qualifierNameMap) {
+ this.qualifierNameMap = qualifierNameMap;
+ }
+ public String getPrefix() {
+ return prefix;
+ }
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+ public String getService() {
+ return service;
+ }
+ public void setService(String service) {
+ this.service = service;
+ }
+ public String getServiceCreationPath() {
+ return serviceCreationPath;
+ }
+ public void setServiceCreationPath(String serviceCreationPath) {
+ this.serviceCreationPath = serviceCreationPath;
+ }
+ public String getServiceDeletionPath() {
+ return serviceDeletionPath;
+ }
+ public void setServiceDeletionPath(String serviceDeletionPath) {
+ this.serviceDeletionPath = serviceDeletionPath;
+ }
+ public String[] getPartitions() {
+ return partitions;
+ }
+ public void setPartitions(String[] partitions) {
+ this.partitions = partitions;
+ }
+ public IndexDefinition[] getIndexes() {
+ return indexes;
+ }
+ public void setIndexes(IndexDefinition[] indexes) {
+ this.indexes = indexes;
+ }
+ public Map<String, Method> getQualifierGetterMap() {
+ return qualifierGetterMap;
+ }
+ public void setQualifierGetterMap(Map<String, Method> qualifierGetterMap) {
+ this.qualifierGetterMap = qualifierGetterMap;
+ }
+// public Map<String,String> getQualifierDisplayNameMap(){
+// Map<String,String> qualifierDisplayNameMap = new HashMap<String, String>();
+// for(Map.Entry<String,Qualifier> entry: qualifierNameMap.entrySet()){
+// qualifierDisplayNameMap.put(entry.getKey(),entry.getValue().getDisplayName());
+// }
+// return qualifierDisplayNameMap;
+// }
+
+ /**
+ * a filed is a tag when this field is neither in qualifierNameMap nor in displayNameMap
+ * @param field
+ * @return
+ */
+ public boolean isTag(String field){
+ return (qualifierNameMap.get(field) == null && displayNameMap.get(field) == null);
+// return (qualifierNameMap.get(field) == null);
+ }
+
+ /**
+ * Check if the specified field is a partition tag field
+ */
+ public boolean isPartitionTag(String field) {
+ if (partitions == null || (!isTag(field))) {
+ return false;
+ }
+ for (String partition : partitions) {
+ if (partition.equals(field)) {
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ public Object getValue(TaggedLogAPIEntity entity, String field) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ if (!entityClass.equals(entity.getClass())) {
+ if ((entityClass.equals(GenericMetricEntity.class) && entity.getClass().equals(GenericMetricShadowEntity.class))) {
+ GenericMetricShadowEntity e = (GenericMetricShadowEntity)entity;
+ return e.getValue();
+ } else {
+ throw new IllegalArgumentException("Invalid entity type: " + entity.getClass().getSimpleName());
+ }
+ }
+ final Method m = qualifierGetterMap.get(field);
+ if (m == null) {
+ // The field is a tag
+ if (entity.getTags() != null) {
+ return entity.getTags().get(field);
+ }
+ }
+ if (m != null) {
+ return m.invoke(entity);
+ }
+ return null;
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(entityClass.getName());
+ out.writeUTF(table);
+ out.writeUTF(columnFamily);
+ out.writeUTF(prefix);
+ out.writeUTF(service);
+
+ int partitionsLen = 0;
+ if(partitions != null) partitionsLen =partitions.length;
+ out.writeInt(partitionsLen);
+ for (int i = 0; i < partitionsLen; i++) {
+ out.writeUTF(partitions[i]);
+ }
+
+ int displayNameMapSize = displayNameMap.size();
+ out.writeInt(displayNameMapSize);
+ for(Map.Entry<String,Qualifier> entry: displayNameMap.entrySet()){
+ out.writeUTF(entry.getKey());
+ entry.getValue().write(out);
+ }
+
+ int qualifierNameMapSize = qualifierNameMap.size();
+ out.writeInt(qualifierNameMapSize);
+ for(Map.Entry<String,Qualifier> entry: qualifierNameMap.entrySet()){
+ out.writeUTF(entry.getKey());
+ entry.getValue().write(out);
+ }
+
+ // TODO: write qualifierGetterMap
+ out.writeBoolean(isTimeSeries);
+
+ boolean hasMetricDefinition = metricDefinition != null;
+ out.writeBoolean(hasMetricDefinition);
+ if(hasMetricDefinition) {
+ // write MetricDefinition
+ metricDefinition.write(out);
+ }
+
+ // TODO: write indexes
+ }
+
+
+ public void setEntityDefinition(EntityDefinition ed){
+ this.entityClass = ed.getEntityClass();
+ this.table = ed.getTable();
+ this.columnFamily = ed.getColumnFamily();
+ this.prefix = ed.getPrefix();
+ this.service = ed.getService();
+ this.partitions = ed.getPartitions();
+ this.displayNameMap = ed.getDisplayNameMap();
+ this.qualifierGetterMap = ed.getQualifierGetterMap();
+ this.qualifierNameMap = ed.getQualifierNameMap();
+ this.isTimeSeries = ed.isTimeSeries();
+ this.metricDefinition = ed.metricDefinition;
+ this.indexes = ed.getIndexes();
+ }
+
+ //////////////////////////////////////////////
+ // TODO: Cache object for reading in region side
+ //////////////////////////////////////////////
+ // private final static Map<String,EntityDefinition> _classEntityDefinitionCache = new HashMap<String, EntityDefinition>();
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ String entityClassName = in.readUTF();
+// EntityDefinition _cached = _classEntityDefinitionCache.get(entityClassName);
+// if(_cached !=null){
+// setEntityDefinition(_cached);
+// LOG.info("Got cached definition for entity: "+entityClassName);
+// return;
+// }
+ if(LOG.isDebugEnabled()) LOG.debug("Reading EntityDefinition entity: "+entityClassName);
+ try {
+ entityClass = (Class<? extends TaggedLogAPIEntity>) Class.forName(entityClassName);
+ } catch (Exception e) {
+ // ignore
+ }
+ table = in.readUTF();
+ columnFamily = in.readUTF();
+ prefix = in.readUTF();
+ service = in.readUTF();
+
+ int partitionsLen = in.readInt();
+ partitions = new String[partitionsLen];
+ for (int i = 0; i < partitionsLen; i++) {
+ partitions[i] = in.readUTF();
+ }
+ int displayNameMapSize = in.readInt();
+ for(int i=0;i<displayNameMapSize;i++){
+ String key = in.readUTF();
+ Qualifier value = new Qualifier();
+ value.readFields(in);
+ displayNameMap.put(key,value);
+ }
+ int qualifierNameMapSize = in.readInt();
+ for(int i=0;i<qualifierNameMapSize;i++){
+ String key = in.readUTF();
+ Qualifier value = new Qualifier();
+ value.readFields(in);
+ qualifierNameMap.put(key,value);
+ }
+ // TODO: readFields qualifierGetterMap
+ isTimeSeries = in.readBoolean();
+
+ // readFields MetricDefinition
+ boolean hasMetricDefinition = in.readBoolean();
+ if(hasMetricDefinition) {
+ if(LOG.isDebugEnabled()) LOG.debug("reading metricDefinition");
+ metricDefinition = new MetricDefinition();
+ metricDefinition.readFields(in);
+ }
+ // TODO: readFields indexes
+// _classEntityDefinitionCache.put(entityClassName,this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
new file mode 100755
index 0000000..e144e05
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.repo.EntityRepositoryScanner;
+import org.mockito.cglib.beans.BeanGenerator;
+import org.mockito.cglib.core.NamingPolicy;
+import org.mockito.cglib.core.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * static initialization of all registered entities. As of now, dynamic registration is not supported
+ */
+public class EntityDefinitionManager {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityDefinitionManager.class);
+ private static volatile boolean initialized = false;
+ /**
+ * using concurrent hashmap is due to the fact that entity can be registered any time from any thread
+ */
+ private static Map<String, EntityDefinition> entityServiceMap = new ConcurrentHashMap<String, EntityDefinition>();
+ private static Map<Class<? extends TaggedLogAPIEntity>, EntityDefinition> classMap = new ConcurrentHashMap<Class<? extends TaggedLogAPIEntity>, EntityDefinition>();
+ private static Map<Class<?>, EntitySerDeser<?>> _serDeserMap = new ConcurrentHashMap<Class<?>, EntitySerDeser<?>>();
+ private static Map<Class<?>, Integer> _serDeserClassIDMap = new ConcurrentHashMap<Class<?>, Integer>();
+ private static Map<Integer, Class<?>> _serIDDeserClassMap = new ConcurrentHashMap<Integer, Class<?>>();
+ private static Map<String, Map<Integer, EntityDefinition>> entityPrefixMap = new ConcurrentHashMap<String, Map<Integer, EntityDefinition>>();
+ private static Map<String, Map<Integer, IndexDefinition>> indexPrefixMap = new ConcurrentHashMap<String, Map<Integer, IndexDefinition>>();
+
+ static{
+ int id = 0;
+ _serDeserMap.put(NullObject.class, new NullSerDeser());
+ _serIDDeserClassMap.put(id, NullObject.class);
+ _serDeserClassIDMap.put(NullObject.class, id++);
+
+ _serDeserMap.put(String.class, new StringSerDeser());
+ _serIDDeserClassMap.put(id, String.class);
+ _serDeserClassIDMap.put(String.class, id++);
+
+ _serDeserMap.put(long.class, new LongSerDeser());
+ _serIDDeserClassMap.put(id, long.class);
+ _serDeserClassIDMap.put(long.class, id++);
+
+ _serDeserMap.put(Long.class, new LongSerDeser());
+ _serIDDeserClassMap.put(id, Long.class);
+ _serDeserClassIDMap.put(Long.class, id++);
+
+ _serDeserMap.put(int.class, new IntSerDeser());
+ _serIDDeserClassMap.put(id, int.class);
+ _serDeserClassIDMap.put(int.class, id++);
+
+ _serDeserMap.put(Integer.class, new IntSerDeser());
+ _serIDDeserClassMap.put(id, Integer.class);
+ _serDeserClassIDMap.put(Integer.class, id++);
+
+ _serDeserMap.put(Double.class, new DoubleSerDeser());
+ _serIDDeserClassMap.put(id, Double.class);
+ _serDeserClassIDMap.put(Double.class, id++);
+
+ _serDeserMap.put(double.class, new DoubleSerDeser());
+ _serIDDeserClassMap.put(id, double.class);
+ _serDeserClassIDMap.put(double.class, id++);
+
+ _serDeserMap.put(int[].class, new IntArraySerDeser());
+ _serIDDeserClassMap.put(id, int[].class);
+ _serDeserClassIDMap.put(int[].class, id++);
+
+ _serDeserMap.put(double[].class, new DoubleArraySerDeser());
+ _serIDDeserClassMap.put(id, double[].class);
+ _serDeserClassIDMap.put(double[].class, id++);
+
+ _serDeserMap.put(double[][].class, new Double2DArraySerDeser());
+ _serIDDeserClassMap.put(id, double[][].class);
+ _serDeserClassIDMap.put(double[][].class, id++);
+
+ _serDeserMap.put(Boolean.class, new BooleanSerDeser());
+ _serIDDeserClassMap.put(id, Boolean.class);
+ _serDeserClassIDMap.put(Boolean.class, id++);
+
+ _serDeserMap.put(boolean.class, new BooleanSerDeser());
+ _serIDDeserClassMap.put(id, boolean.class);
+ _serDeserClassIDMap.put(boolean.class, id++);
+
+ _serDeserMap.put(String[].class, new StringArraySerDeser());
+ _serIDDeserClassMap.put(id, String[].class);
+ _serDeserClassIDMap.put(String[].class, id++);
+
+ _serDeserMap.put(Map.class, new MapSerDeser());
+ _serIDDeserClassMap.put(id, Map.class);
+ _serDeserClassIDMap.put(Map.class, id++);
+
+ _serDeserMap.put(List.class, new ListSerDeser());
+ _serIDDeserClassMap.put(id, List.class);
+ _serDeserClassIDMap.put(List.class, id++);
+ }
+
+
+
+ @SuppressWarnings("rawtypes")
+ public static EntitySerDeser getSerDeser(Class<?> clazz){
+ return _serDeserMap.get(clazz);
+ }
+
+ /**
+ * Get internal ID by the predefined registered class
+ * @param clazz original for serialization/deserialization
+ * @return the internal id if the input class has been registered, otherwise return -1
+ */
+ public static int getIDBySerDerClass(Class<?> clazz) {
+ final Integer id = _serDeserClassIDMap.get(clazz);
+ if (id == null) {
+ return -1;
+ }
+ return id;
+ }
+
+
+ /**
+ * Get the predefined registered class by internal ID
+ * @param id the internal class ID
+ * @return the predefined registered class, if the class hasn't been registered, return null
+ */
+ public static Class<?> getClassByID(int id) {
+ return _serIDDeserClassMap.get(id);
+ }
+
+ /**
+ * it is allowed that user can register their own entity
+ * @param clazz entity class
+ * @throws IllegalArgumentException
+ */
+ public static void registerEntity(Class<? extends TaggedLogAPIEntity> clazz) throws IllegalArgumentException{
+ registerEntity(createEntityDefinition(clazz));
+ }
+
+ /**
+ * it is allowed that user can register their own entity
+ * @deprecated This API is deprecated since we need to use Service annotation to define service name for entities
+ * @param serviceName entity service name
+ * @param clazz entity class
+ * @throws IllegalArgumentException
+ *
+ */
+ @Deprecated
+ public static void registerEntity(String serviceName, Class<? extends TaggedLogAPIEntity> clazz) throws IllegalArgumentException{
+ registerEntity(serviceName, createEntityDefinition(clazz));
+ }
+
+ /**
+ * it is allowed that user can register their own entity definition
+ * @param entityDef entity definition
+ * @throws IllegalArgumentException
+ */
+ public static void registerEntity(EntityDefinition entityDef) {
+ registerEntity(entityDef.getService(), entityDef);
+ }
+
+ /**
+ * it is allowed that user can register their own entity definition
+ * @deprecated This API is deprecated since we need to use Service annotation to define service name for entities.
+ *
+ * @param entityDef entity definition
+ * @throws IllegalArgumentException
+ */
+ public static void registerEntity(String serviceName, EntityDefinition entityDef) {
+ final String table = entityDef.getTable();
+ if (entityServiceMap.containsKey(serviceName)) {
+ final EntityDefinition existing = entityServiceMap.get(serviceName);
+ if (entityDef.getClass().equals(existing.getClass())) {
+ return;
+ }
+ throw new IllegalArgumentException("Service " + serviceName + " has already been registered by " + existing.getClass().getName() + ", so class " + entityDef.getClass() + " can NOT be registered");
+ }
+ synchronized (EntityDefinitionManager.class) {
+ checkPrefix(entityDef);
+ entityServiceMap.put(serviceName, entityDef);
+ Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(table);
+ if (entityHashMap == null) {
+ entityHashMap = new ConcurrentHashMap<Integer, EntityDefinition>();
+ entityPrefixMap.put(table, entityHashMap);
+ }
+ entityHashMap.put(entityDef.getPrefix().hashCode(), entityDef);
+ final IndexDefinition[] indexes = entityDef.getIndexes();
+ if (indexes != null) {
+ for (IndexDefinition index : indexes) {
+ Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(table);
+ if (indexHashMap == null) {
+ indexHashMap = new ConcurrentHashMap<Integer, IndexDefinition>();
+ indexPrefixMap.put(table, indexHashMap);
+ }
+ indexHashMap.put(index.getIndexPrefix().hashCode(), index);
+ }
+ }
+ classMap.put(entityDef.getEntityClass(), entityDef);
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(entityDef.getEntityClass().getSimpleName() + " entity registered successfully, table name: " + entityDef.getTable() +
+ ", prefix: " + entityDef.getPrefix() + ", service: " + serviceName + ", CF: " + entityDef.getColumnFamily());
+ }else{
+ LOG.info(String.format("Registered %s (%s)", entityDef.getEntityClass().getSimpleName(), serviceName));
+ }
+ }
+
+ private static void checkPrefix(EntityDefinition entityDef) {
+ final Integer entityPrefixHashcode = entityDef.getPrefix().hashCode();
+ if (entityPrefixMap.containsKey(entityDef.getTable())) {
+ final Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(entityDef.getTable());
+ if (entityHashMap.containsKey(entityPrefixHashcode) && (!entityDef.equals(entityHashMap.get(entityPrefixHashcode)))) {
+ throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The entity prefix " + entityDef.getPrefix() + " has already been registered by entity service " + entityHashMap.get(entityPrefixHashcode).getService());
+ }
+ final IndexDefinition[] indexes = entityDef.getIndexes();
+ if (indexes != null) {
+ for (IndexDefinition index : indexes) {
+ final Integer indexPrefixHashcode = index.getIndexPrefix().hashCode();
+ if (entityHashMap.containsKey(indexPrefixHashcode)) {
+ throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The index prefix " + index.getIndexPrefix() + " has already been registered by entity " + entityHashMap.get(indexPrefixHashcode).getService());
+ }
+ final Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(entityDef.getTable());
+ if (indexHashMap != null && indexHashMap.containsKey(indexPrefixHashcode) && (!index.equals(indexHashMap.get(indexPrefixHashcode)))) {
+ throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The index prefix " + index.getIndexPrefix() + " has already been registered by entity " + indexHashMap.get(indexPrefixHashcode).getEntityDefinition().getService());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Get entity definition by name
+ * @param serviceName
+ * @return
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ public static EntityDefinition getEntityByServiceName(String serviceName) throws InstantiationException, IllegalAccessException{
+ checkInit();
+ return entityServiceMap.get(serviceName);
+ }
+
+ public static EntityDefinition getEntityDefinitionByEntityClass(Class<? extends TaggedLogAPIEntity> clazz) throws InstantiationException, IllegalAccessException {
+ checkInit();
+ return classMap.get(clazz);
+ }
+
+ private static void checkInit() throws InstantiationException, IllegalAccessException {
+ if (!initialized) {
+ synchronized (EntityDefinitionManager.class) {
+ if (!initialized) {
+ EntityRepositoryScanner.scan();
+ initialized = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * User can register their own field SerDeser
+ * @param clazz class of the the SerDeser
+ * @param entitySerDeser entity or field SerDeser
+ * @throws IllegalArgumentException
+ */
+ public static void registerSerDeser(Class<?> clazz, EntitySerDeser<?> entitySerDeser) {
+ _serDeserMap.put(clazz, entitySerDeser);
+ }
+
+ /**
+ * Check whether the entity class is time series, false by default
+ * @param clazz
+ * @return
+ */
+ public static boolean isTimeSeries(Class<? extends TaggedLogAPIEntity> clazz){
+ TimeSeries ts = clazz.getAnnotation(TimeSeries.class);
+ return ts != null && ts.value();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static EntityDefinition createEntityDefinition(Class<? extends TaggedLogAPIEntity> cls) {
+
+ final EntityDefinition ed = new EntityDefinition();
+
+ ed.setEntityClass(cls);
+ // parse cls' annotations
+ Table table = cls.getAnnotation(Table.class);
+ if(table == null || table.value().isEmpty()){
+ throw new IllegalArgumentException("Entity class must have a non-empty table name annotated with @Table");
+ }
+ String tableName = table.value();
+ if(EagleConfigFactory.load().isTableNamePrefixedWithEnvironment()){
+ tableName = EagleConfigFactory.load().getEnv() + "_" + tableName;
+ }
+ ed.setTable(tableName);
+
+ ColumnFamily family = cls.getAnnotation(ColumnFamily.class);
+ if(family == null || family.value().isEmpty()){
+ throw new IllegalArgumentException("Entity class must have a non-empty column family name annotated with @ColumnFamily");
+ }
+ ed.setColumnFamily(family.value());
+
+ Prefix prefix = cls.getAnnotation(Prefix.class);
+ if(prefix == null || prefix.value().isEmpty()){
+ throw new IllegalArgumentException("Entity class must have a non-empty prefix name annotated with @Prefix");
+ }
+ ed.setPrefix(prefix.value());
+
+ TimeSeries ts = cls.getAnnotation(TimeSeries.class);
+ if(ts == null){
+ throw new IllegalArgumentException("Entity class must have a non-empty timeseries name annotated with @TimeSeries");
+ }
+ ed.setTimeSeries(ts.value());
+
+ Service service = cls.getAnnotation(Service.class);
+ if(service == null || service.value().isEmpty()){
+ ed.setService(cls.getSimpleName());
+ } else {
+ ed.setService(service.value());
+ }
+
+ Metric m = cls.getAnnotation(Metric.class);
+ Map<String, Class<?>> dynamicFieldTypes = new HashMap<String, Class<?>>();
+ if(m != null){
+ // metric has to be timeseries
+ if(!ts.value()){
+ throw new IllegalArgumentException("Metric entity must be time series as well");
+ }
+ MetricDefinition md = new MetricDefinition();
+ md.setInterval(m.interval());
+ ed.setMetricDefinition(md);
+ }
+
+ java.lang.reflect.Field[] fields = cls.getDeclaredFields();
+ for(java.lang.reflect.Field f : fields){
+ Column column = f.getAnnotation(Column.class);
+ if(column == null || column.value().isEmpty()){
+ continue;
+ }
+ Class<?> fldCls = f.getType();
+ // intrusive check field type for metric entity
+ checkFieldTypeForMetric(ed.getMetricDefinition(), f.getName(), fldCls, dynamicFieldTypes);
+ Qualifier q = new Qualifier();
+ q.setDisplayName(f.getName());
+ q.setQualifierName(column.value());
+ EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls);
+ if(serDeser == null){
+ throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() +
+ " of entity " + cls.getSimpleName() + " has no serializer associated ");
+ } else {
+ q.setSerDeser((EntitySerDeser<Object>)serDeser);
+ }
+ ed.getQualifierNameMap().put(q.getQualifierName(), q);
+ ed.getDisplayNameMap().put(q.getDisplayName(), q);
+ // TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() according to org.apache.commons.beanutils.PropertyUtils
+ final String propertyName = f.getName().substring(0,1).toUpperCase() + f.getName().substring(1);
+ String getterName = "get" + propertyName;
+ try {
+ Method method = cls.getMethod(getterName);
+ ed.getQualifierGetterMap().put(f.getName(), method);
+ } catch (Exception e) {
+ // Check if the type is boolean
+ getterName = "is" + propertyName;
+ try {
+ Method method = cls.getMethod(getterName);
+ ed.getQualifierGetterMap().put(f.getName(), method);
+ } catch (Exception e1) {
+ throw new IllegalArgumentException("Field " + f.getName() + " hasn't defined valid getter method: " + getterName, e);
+ }
+ }
+ if(LOG.isDebugEnabled()) LOG.debug("Field registered " + q);
+ }
+
+ // TODO: Lazy create because not used at all
+ // dynamically create bean class
+ if(ed.getMetricDefinition() != null){
+ Class<?> metricCls = createDynamicClassForMetric(cls.getName()+"_SingleTimestamp", dynamicFieldTypes);
+ ed.getMetricDefinition().setSingleTimestampEntityClass(metricCls);
+ }
+
+ final Partition partition = cls.getAnnotation(Partition.class);
+ if (partition != null) {
+ final String[] partitions = partition.value();
+ ed.setPartitions(partitions);
+ // Check if partition fields are all tag fields. Partition field can't be column field, must be tag field.
+ for (String part : partitions) {
+ if (!ed.isTag(part)) {
+ throw new IllegalArgumentException("Partition field can't be column field, must be tag field. "
+ + "Partition name: " + part);
+ }
+ }
+ }
+
+ final Indexes indexes = cls.getAnnotation(Indexes.class);
+ if (indexes != null) {
+ final Index[] inds = indexes.value();
+ final IndexDefinition[] indexDefinitions = new IndexDefinition[inds.length];
+ for (int i = 0; i < inds.length; ++i) {
+ final Index ind = inds[i];
+ indexDefinitions[i] = new IndexDefinition(ed, ind);
+ }
+ ed.setIndexes(indexDefinitions);
+ }
+
+ final ServicePath path = cls.getAnnotation(ServicePath.class);
+ if (path != null) {
+ if (path.path() != null && (!path.path().isEmpty())) {
+ ed.setServiceCreationPath(path.path());
+ }
+ }
+
+ return ed;
+ }
+
+ private static void checkFieldTypeForMetric(MetricDefinition md, String fieldName, Object fldCls, Map<String, Class<?>> dynamicFieldTypes){
+ if(md != null){
+ if(fldCls.equals(int[].class)){
+ dynamicFieldTypes.put(fieldName, int.class);
+ return;
+ }else if(fldCls.equals(long[].class)){
+ dynamicFieldTypes.put(fieldName, long.class);
+ return;
+ }else if(fldCls.equals(double[].class)){
+ dynamicFieldTypes.put(fieldName, double.class);
+ return;
+ }
+ throw new IllegalArgumentException("Fields for metric entity must be one of int[], long[] or double[]");
+ }
+ }
+
+ private static Class<?> createDynamicClassForMetric(final String className, Map<String, Class<?>> dynamicFieldTypes){
+ BeanGenerator beanGenerator = new BeanGenerator();
+ beanGenerator.setNamingPolicy(new NamingPolicy(){
+ @Override
+ public String getClassName(String prefix,String source, Object key, Predicate names){
+ return className;
+ }});
+ BeanGenerator.addProperties(beanGenerator, dynamicFieldTypes);
+ beanGenerator.setSuperclass(TaggedLogAPIEntity.class);
+ return (Class<?>) beanGenerator.createClass();
+ }
+
+ public static Map<String, EntityDefinition> entities() throws Exception{
+ checkInit();
+ return entityServiceMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
new file mode 100755
index 0000000..25d55e0
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+public interface EntitySerDeser<T> {
+ public T deserialize(byte[] bytes);
+ public byte[] serialize(T t);
+ public Class<T> type();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
new file mode 100755
index 0000000..a7ec4e4
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.beans.PropertyDescriptor;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitySerDeserializer {
+ private static final Logger LOG = LoggerFactory.getLogger(EntitySerDeserializer.class);
+
+ // TODO throws seperate exceptions
+ @SuppressWarnings("unchecked")
+ public <T> T readValue(Map<String, byte[]> qualifierValues, EntityDefinition ed) throws Exception{
+ Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass();
+ if(clazz == null){
+ throw new NullPointerException("Entity class of service "+ed.getService()+" is null");
+ }
+ TaggedLogAPIEntity obj = clazz.newInstance();
+ Map<String, Qualifier> map = ed.getQualifierNameMap();
+ for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){
+ Qualifier q = map.get(entry.getKey());
+ if(q == null){
+ // if it's not pre-defined qualifier, it must be tag unless it's a bug
+ if(obj.getTags() == null){
+ obj.setTags(new HashMap<String, String>());
+ }
+ obj.getTags().put(entry.getKey(), new StringSerDeser().deserialize(entry.getValue()));
+ continue;
+ }
+
+ // TODO performance loss compared with new operator
+ // parse different types of qualifiers
+ String fieldName = q.getDisplayName();
+ PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, fieldName);
+ if(entry.getValue() != null){
+ Object args = q.getSerDeser().deserialize(entry.getValue());
+ pd.getWriteMethod().invoke(obj, args);
+// if (logger.isDebugEnabled()) {
+// logger.debug(entry.getKey() + ":" + args + " is deserialized");
+// }
+ }
+ }
+ return (T)obj;
+ }
+
+ public Map<String, byte[]> writeValue(TaggedLogAPIEntity entity, EntityDefinition ed) throws Exception{
+ Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>();
+ // iterate all modified qualifiers
+ for(String fieldName : entity.modifiedQualifiers()){
+ PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, fieldName);
+ Object obj = pd.getReadMethod().invoke(entity);
+ Qualifier q = ed.getDisplayNameMap().get(fieldName);
+ EntitySerDeser<Object> ser = q.getSerDeser();
+ byte[] value = ser.serialize(obj);
+ qualifierValues.put(q.getQualifierName(), value);
+ }
+ return qualifierValues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
new file mode 100755
index 0000000..c7dc113
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Index {
+
+ public String name();
+ public String[] columns();
+ public boolean unique();
+// boolean unique() default true;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
new file mode 100755
index 0000000..2e62420
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.query.parser.ANDExpression;
+import org.apache.eagle.query.parser.AtomicExpression;
+import org.apache.eagle.query.parser.ComparisonOperator;
+import org.apache.eagle.query.parser.ORExpression;
+import org.apache.eagle.common.ByteUtil;
+
+/**
+ * Eagle index schema definition.
+ *
+ * 1. Index schema can be defined in entity class by annotation.
+ * 2. One index schema can contain multiple fields/tags, defined in order
+ * 3. We only support immutable indexing for now
+ * 4. When entity is created or deleted, the corresponding index entity should be created or deleted at the same time
+ * 5. Index transparency to queries. Queries go through index when and only when index can serve all search conditions after query rewrite
+ *
+ *
+ */
+public class IndexDefinition {
+
+ public enum IndexType {
+ UNIQUE_INDEX,
+ NON_CLUSTER_INDEX,
+ NON_INDEX
+ }
+
+ private final EntityDefinition entityDef;
+ private final Index index;
+ private final IndexColumn[] columns;
+ private final String indexPrefix;
+
+ private static final byte[] EMPTY_VALUE = new byte[0];
+ private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+ public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
+ public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535;
+
+ private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$";
+ private static final Pattern FIELD_NAME_PATTERN = Pattern.compile(FIELD_NAME_PATTERN_STRING);
+ private final static Logger LOG = LoggerFactory.getLogger(IndexDefinition.class);
+
+ public IndexDefinition(EntityDefinition entityDef, Index index) {
+ this.entityDef = entityDef;
+ this.index = index;
+ this.indexPrefix = entityDef.getPrefix() + "_" + index.name();
+ final String[] indexColumns = index.columns();
+ this.columns = new IndexColumn[indexColumns.length];
+ for (int i = 0; i < indexColumns.length; ++i) {
+ final String name = indexColumns[i];
+ final boolean isTag = entityDef.isTag(name);
+ final Qualifier qualifier = isTag ? null : entityDef.getDisplayNameMap().get(name);
+ columns[i] = new IndexColumn(name, isTag, qualifier);
+ }
+ LOG.info("Created index " + index.name() + " for " + entityDef.getEntityClass().getSimpleName());
+ }
+
+ public EntityDefinition getEntityDefinition() {
+ return entityDef;
+ }
+
+ public Index getIndex() {
+ return index;
+ }
+
+ public String getIndexName() {
+ return index.name();
+ }
+
+ public IndexColumn[] getIndexColumns() {
+ return columns;
+ }
+
+ public String getIndexPrefix() {
+ return indexPrefix;
+ }
+
+ public boolean isUnique() {
+ return index.unique();
+ }
+
+ /**
+ * Check if the query is suitable to go through index. If true, then return the value of index fields in order. Otherwise return null.
+ * TODO: currently index fields should be string type.
+ *
+ * @param query query expression after re-write
+ * @param rowkeys if the query can go through the index, all rowkeys will be added into rowkeys.
+ * @return true if the query can go through the index, otherwise return false
+ */
+ public IndexType canGoThroughIndex(ORExpression query, List<byte[]> rowkeys) {
+ if (query == null || query.getANDExprList() == null || query.getANDExprList().isEmpty())
+ return IndexType.NON_CLUSTER_INDEX;
+ if (rowkeys != null) {
+ rowkeys.clear();
+ }
+ final Map<String, String> indexfieldMap = new HashMap<String, String>();
+ for(ANDExpression andExpr : query.getANDExprList()) {
+ indexfieldMap.clear();
+ for(AtomicExpression ae : andExpr.getAtomicExprList()) {
+ // TODO temporarily ignore those fields which are not for attributes
+ final String fieldName = parseEntityAttribute(ae.getKey());
+ if(fieldName != null && ComparisonOperator.EQUAL.equals(ae.getOp())){
+ indexfieldMap.put(fieldName, ae.getValue());
+ }
+ }
+ final String[] partitions = entityDef.getPartitions();
+ int[] partitionValueHashs = null;
+ if (partitions != null) {
+ partitionValueHashs = new int[partitions.length];
+ for (int i = 0; i < partitions.length; ++i) {
+ final String value = indexfieldMap.get(partitions[i]);
+ if (value == null) {
+ throw new IllegalArgumentException("Partition " + partitions[i] + " is not defined in the query: " + query.toString());
+ }
+ partitionValueHashs[i] = value.hashCode();
+ }
+ }
+ final byte[][] indexFieldValues = new byte[columns.length][];
+ for (int i = 0; i < columns.length; ++i) {
+ final IndexColumn col = columns[i];
+ if (!indexfieldMap.containsKey(col.getColumnName())) {
+ // If we have to use scan anyway, there's no need to go through index
+ return IndexType.NON_INDEX;
+ }
+ final String value = indexfieldMap.get(col.getColumnName());
+ indexFieldValues[i] = value.getBytes();
+ }
+ final byte[] rowkey = generateUniqueIndexRowkey(indexFieldValues, partitionValueHashs, null);
+ if (rowkeys != null) {
+ rowkeys.add(rowkey);
+ }
+ }
+ if (index.unique()) {
+ return IndexType.UNIQUE_INDEX;
+ }
+ return IndexType.NON_CLUSTER_INDEX;
+ }
+
+ private String parseEntityAttribute(String fieldName) {
+ Matcher m = FIELD_NAME_PATTERN.matcher(fieldName);
+ if(m.find()){
+ return m.group(1);
+ }
+ return null;
+ }
+
+ // TODO: We should move index rowkey generation later since this class is for general purpose, not only for hbase.
+ public byte[] generateIndexRowkey(TaggedLogAPIEntity entity) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ if (entity.getClass() != entityDef.getEntityClass()) {
+ throw new IllegalArgumentException("Expected entity class: " + entityDef.getEntityClass().getName() + ", but got class " + entity.getClass().getName());
+ }
+ final byte[][] indexValues = generateIndexValues(entity);
+ final int[] partitionHashCodes = generatePartitionHashCodes(entity);
+ SortedMap<Integer, Integer> tagMap = null;
+ if (!index.unique()) {
+ // non cluster index
+ tagMap = RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags());
+ }
+
+ return generateUniqueIndexRowkey(indexValues, partitionHashCodes, tagMap);
+ }
+
+ private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] partitionHashCodes, SortedMap<Integer, Integer> tagMap) {
+ final int prefixHashCode = indexPrefix.hashCode();
+ int totalLength = 4;
+ totalLength += (partitionHashCodes != null) ? (4 * partitionHashCodes.length) : 0;
+
+ totalLength += (2 * indexValues.length);
+ for (int i = 0; i < indexValues.length; ++i) {
+ final byte[] value = indexValues[i];
+ totalLength += value.length;
+ }
+ if (tagMap != null && (!tagMap.isEmpty())) {
+ totalLength += tagMap.size() * 8;
+ }
+
+ int offset = 0;
+ final byte[] rowkey = new byte[totalLength];
+
+ // 1. set prefix
+ ByteUtil.intToBytes(prefixHashCode, rowkey, offset);
+ offset += 4;
+
+ // 2. set partition
+ if (partitionHashCodes != null) {
+ for (Integer partitionHashCode : partitionHashCodes) {
+ ByteUtil.intToBytes(partitionHashCode, rowkey, offset);
+ offset += 4;
+ }
+ }
+
+ // 3. set index values
+ for (int i = 0; i < columns.length; ++i) {
+ ByteUtil.shortToBytes((short)indexValues[i].length, rowkey, offset);
+ offset += 2;
+ for (int j = 0; j < indexValues[i].length; ++j) {
+ rowkey[offset++] = indexValues[i][j];
+ }
+ }
+
+ // Check if it's non clustered index, then set the tag/value hash code
+ if (tagMap != null && (!tagMap.isEmpty())) {
+ // 4. set tag key/value hashes
+ for (Map.Entry<Integer, Integer> entry : tagMap.entrySet()) {
+ ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
+ offset += 4;
+ ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
+ offset += 4;
+ }
+ }
+
+ return rowkey;
+ }
+
+ private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) {
+ final String[] partitions = entityDef.getPartitions();
+ int[] result = null;
+ if (partitions != null) {
+ result = new int[partitions.length];
+ final Map<String, String> tags = entity.getTags();
+ for (int i = 0 ; i < partitions.length; ++i) {
+ final String partition = partitions[i];
+ final String tagValue = tags.get(partition);
+ if (tagValue != null) {
+ result[i] = tagValue.hashCode();
+ } else {
+ result[i] = EMPTY_PARTITION_DEFAULT_HASH_CODE;
+ }
+ }
+ }
+ return result;
+ }
+
+ private byte[][] generateIndexValues(TaggedLogAPIEntity entity) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+
+ final byte[][] result = new byte[columns.length][];
+ for (int i = 0; i < columns.length; ++i) {
+ final IndexColumn column = columns[i];
+ final String columnName = column.getColumnName();
+ if (column.isTag) {
+ final Map<String, String> tags = entity.getTags();
+ if (tags == null || tags.get(columnName) == null) {
+ result[i] = EMPTY_VALUE;
+ } else {
+ result[i] = tags.get(columnName).getBytes(UTF_8_CHARSET);
+ }
+ } else {
+ PropertyDescriptor pd = column.getPropertyDescriptor();
+ if (pd == null) {
+ pd = PropertyUtils.getPropertyDescriptor(entity, columnName);
+ column.setPropertyDescriptor(pd);
+ }
+ final Object value = pd.getReadMethod().invoke(entity);
+ if (value == null) {
+ result[i] = EMPTY_VALUE;
+ } else {
+ final Qualifier q = column.getQualifier();
+ result[i] = q.getSerDeser().serialize(value);
+ }
+ }
+ if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) {
+ throw new IllegalArgumentException("Index field value exceeded the max length: " + MAX_INDEX_VALUE_BYTE_LENGTH + ", actual length: " + result[i].length);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Index column definition class
+ *
+ */
+ public static class IndexColumn {
+ private final String columnName;
+ private final boolean isTag;
+ private final Qualifier qualifier;
+ private PropertyDescriptor propertyDescriptor;
+
+ public IndexColumn(String columnName, boolean isTag, Qualifier qualifier) {
+ this.columnName = columnName;
+ this.isTag = isTag;
+ this.qualifier = qualifier;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+ public boolean isTag() {
+ return isTag;
+ }
+
+ public Qualifier getQualifier() {
+ return qualifier;
+ }
+
+ public PropertyDescriptor getPropertyDescriptor() {
+ return propertyDescriptor;
+ }
+
+ public void setPropertyDescriptor(PropertyDescriptor propertyDescriptor) {
+ this.propertyDescriptor = propertyDescriptor;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
new file mode 100644
index 0000000..3c82a0a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Indexes {
+
+ public Index[] value();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
new file mode 100755
index 0000000..8831223
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+/**
+ * serialize int array which is stored like the following
+ * <int><int>*size, where the first <int> is the size of int
+ */
+public class IntArraySerDeser implements EntitySerDeser<int[]>{
+
+ public IntArraySerDeser(){}
+
+ @Override
+ public int[] deserialize(byte[] bytes){
+ if(bytes.length < 4)
+ return null;
+ int offset = 0;
+ // get size of int array
+ int size = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ int[] values = new int[size];
+ for(int i=0; i<size; i++){
+ values[i] = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ }
+ return values;
+ }
+
+ /**
+ *
+ * @param obj
+ * @return
+ */
+ @Override
+ public byte[] serialize(int[] obj){
+ if(obj == null)
+ return null;
+ int size = obj.length;
+ byte[] array = new byte[4 + 4*size];
+ byte[] first = ByteUtil.intToBytes(size);
+ int offset = 0;
+ System.arraycopy(first, 0, array, offset, first.length);
+ offset += first.length;
+ for(int i=0; i<size; i++){
+ System.arraycopy(ByteUtil.intToBytes(obj[i]), 0, array, offset, 4);
+ offset += 4;
+ }
+ return array;
+ }
+
+ @Override
+ public Class<int[]> type() {
+ return int[].class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
new file mode 100755
index 0000000..695badd
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class IntSerDeser implements EntitySerDeser<Integer>{
+ public IntSerDeser(){}
+
+ @Override
+ public Integer deserialize(byte[] bytes){
+ if(bytes.length < 4)
+ return null;
+ return Integer.valueOf(ByteUtil.bytesToInt(bytes));
+ }
+
+ @Override
+ public byte[] serialize(Integer obj){
+ if(obj == null)
+ return null;
+ return ByteUtil.intToBytes(obj);
+ }
+
+ @Override
+ public Class<Integer> type() {
+ return Integer.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
new file mode 100644
index 0000000..eaf5e92
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.common.ByteUtil;
+
+/**
+ * Serialization/deserialization for map type
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class ListSerDeser implements EntitySerDeser<List> {
+
+ @SuppressWarnings({ "unchecked" })
+ @Override
+ public List deserialize(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ final List list = new ArrayList();
+ int offset = 0;
+ // get size of int array
+ final int size = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+
+ for (int i = 0; i < size; ++i) {
+ final int valueID = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID);
+ if (valueClass == null) {
+ throw new IllegalArgumentException("Unsupported value type ID: " + valueID);
+ }
+ final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+ final int valueLength = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ final byte[] valueContent = new byte[valueLength];
+ System.arraycopy(bytes, offset, valueContent, 0, valueLength);
+ offset += valueLength;
+ final Object value = valueSerDer.deserialize(valueContent);
+
+ list.add(value);
+ }
+ return list;
+ }
+
+ /**
+ * size + value1 type id + value length + value1 binary content + ...
+ * 4B 4B 4B value1 bytes
+ */
+ @SuppressWarnings({ "unchecked" })
+ @Override
+ public byte[] serialize(List list) {
+ if(list == null)
+ return null;
+ final int size = list.size();
+ final int[] valueIDs = new int[size];
+ final byte[][] valueBytes = new byte[size][];
+
+ int totalSize = 4 + size * 8;
+ int i = 0;
+ Iterator iter = list.iterator();
+ while (iter.hasNext()) {
+ final Object value = iter.next();
+ Class<?> valueClass = value.getClass();
+ int valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+
+ if (valueTypeID == -1) {
+ if (value instanceof List) {
+ valueClass = List.class;
+ valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+ }
+ else if (value instanceof Map) {
+ valueClass = Map.class;
+ valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+ }
+ }
+ valueIDs[i] = valueTypeID;
+ final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+ if (valueSerDer == null) {
+ throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+ }
+ valueBytes[i] = valueSerDer.serialize(value);
+ totalSize += valueBytes[i].length;
+ ++i;
+ }
+ final byte[] result = new byte[totalSize];
+ int offset = 0;
+ ByteUtil.intToBytes(size, result, offset);
+ offset += 4;
+ for (i = 0; i < size; ++i) {
+ ByteUtil.intToBytes(valueIDs[i], result, offset);
+ offset += 4;
+ ByteUtil.intToBytes(valueBytes[i].length, result, offset);
+ offset += 4;
+ System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length);
+ offset += valueBytes[i].length;
+ }
+ return result;
+ }
+
+ @Override
+ public Class<List> type() {
+ return List.class;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
new file mode 100755
index 0000000..914cd95
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class LongSerDeser implements EntitySerDeser<Long>{
+ public LongSerDeser(){}
+
+ @Override
+ public Long deserialize(byte[] bytes){
+ if(bytes.length < 8)
+ return null;
+// return new Long(ByteUtil.bytesToLong(bytes));
+ return Long.valueOf(ByteUtil.bytesToLong(bytes));
+ }
+
+ @Override
+ public byte[] serialize(Long obj){
+ if(obj == null)
+ return null;
+ return ByteUtil.longToBytes(obj);
+ }
+
+ @Override
+ public Class<Long> type() {
+ return Long.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
new file mode 100755
index 0000000..d16fe3a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Serialization/deserialization for map type
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class MapSerDeser implements EntitySerDeser<Map> {
+
+ @SuppressWarnings({ "unchecked" })
+ @Override
+ public Map deserialize(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ final Map map = new TreeMap();
+ int offset = 0;
+ // get size of int array
+ final int size = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+
+ for (int i = 0; i < size; ++i) {
+ final int keyID = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ final Class<?> keyClass = EntityDefinitionManager.getClassByID(keyID);
+ if (keyClass == null) {
+ throw new IllegalArgumentException("Unsupported key type ID: " + keyID);
+ }
+ final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass);
+ final int keyLength = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ final byte[] keyContent = new byte[keyLength];
+ System.arraycopy(bytes, offset, keyContent, 0, keyLength);
+ offset += keyLength;
+ final Object key = keySerDer.deserialize(keyContent);
+
+ final int valueID = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID);
+ if (valueClass == null) {
+ throw new IllegalArgumentException("Unsupported value type ID: " + valueID);
+ }
+ final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+ final int valueLength = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ final byte[] valueContent = new byte[valueLength];
+ System.arraycopy(bytes, offset, valueContent, 0, valueLength);
+ offset += valueLength;
+ final Object value = valueSerDer.deserialize(valueContent);
+
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ /**
+ * size + key1 type ID + key1 length + key1 binary content + value1 type id + value length + value1 binary content + ...
+ * 4B 4B 4B key1 bytes 4B 4B value1 bytes
+ */
+ @SuppressWarnings({ "unchecked" })
+ @Override
+ public byte[] serialize(Map map) {
+ if(map == null)
+ return null;
+ final int size = map.size();
+ final int[] keyIDs = new int[size];
+ final int[] valueIDs = new int[size];
+ final byte[][] keyBytes = new byte[size][];
+ final byte[][] valueBytes = new byte[size][];
+
+ int totalSize = 4 + size * 16;
+ int i = 0;
+ Iterator iter = map.entrySet().iterator();
+ while (iter.hasNext()) {
+ final Map.Entry entry = (Map.Entry)iter.next();
+ final Object key = entry.getKey();
+ final Object value = entry.getValue();
+ Class<?> keyClass = key.getClass();
+ Class<?> valueClass = NullObject.class;
+ if (value != null) {
+ valueClass = value.getClass();
+ }
+ int keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass);
+ int valueTypeID = 0; // default null object
+ if (valueClass != null) {
+ valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+ }
+ if (keyTypeID == -1) {
+ if (key instanceof Map) {
+ keyClass = Map.class;
+ keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass);
+ } else {
+ throw new IllegalArgumentException("Unsupported class: " + keyClass.getName());
+ }
+ }
+ if (valueTypeID == -1) {
+ if (value instanceof Map) {
+ valueClass = Map.class;
+ valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+ } else {
+ throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+ }
+ }
+ keyIDs[i] = keyTypeID;
+ valueIDs[i] = valueTypeID;
+ final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass);
+ final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+ if (keySerDer == null) {
+ throw new IllegalArgumentException("Unsupported class: " + keyClass.getName());
+ }
+ if (valueSerDer == null) {
+ throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+ }
+ keyBytes[i] = keySerDer.serialize(key);
+ valueBytes[i] = valueSerDer.serialize(value);
+ totalSize += keyBytes[i].length + valueBytes[i].length;
+ ++i;
+ }
+ final byte[] result = new byte[totalSize];
+ int offset = 0;
+ ByteUtil.intToBytes(size, result, offset);
+ offset += 4;
+ for (i = 0; i < size; ++i) {
+ ByteUtil.intToBytes(keyIDs[i], result, offset);
+ offset += 4;
+ ByteUtil.intToBytes(keyBytes[i].length, result, offset);
+ offset += 4;
+ System.arraycopy(keyBytes[i], 0, result, offset, keyBytes[i].length);
+ offset += keyBytes[i].length;
+
+ ByteUtil.intToBytes(valueIDs[i], result, offset);
+ offset += 4;
+ ByteUtil.intToBytes(valueBytes[i].length, result, offset);
+ offset += 4;
+ System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length);
+ offset += valueBytes[i].length;
+ }
+ return result;
+ }
+
+ @Override
+ public Class<Map> type() {
+ return Map.class;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
new file mode 100644
index 0000000..0e3e776
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Metric {
+ // interval with million seconds
+ long interval() default 60000;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
new file mode 100755
index 0000000..06bbed3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class MetricDefinition implements Writable {
+ private final static Logger LOG = LoggerFactory.getLogger(MetricDefinition.class);
+ private long interval;
+ private Class<?> singleTimestampEntityClass;
+ public long getInterval() {
+ return interval;
+ }
+ public void setInterval(long interval) {
+ this.interval = interval;
+ }
+ public Class<?> getSingleTimestampEntityClass() {
+ return singleTimestampEntityClass;
+ }
+ public void setSingleTimestampEntityClass(Class<?> singleTimestampEntityClass) {
+ this.singleTimestampEntityClass = singleTimestampEntityClass;
+ }
+
+ private final static String EMPTY="";
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if(LOG.isDebugEnabled()) LOG.debug("Writing metric definition: interval = "+interval+" singleTimestampEntityClass = "+ this.singleTimestampEntityClass);
+ out.writeLong(interval);
+ if(this.singleTimestampEntityClass == null){
+ out.writeUTF(EMPTY);
+ }else {
+ out.writeUTF(this.singleTimestampEntityClass.getName());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ interval = in.readLong();
+ String singleTimestampEntityClassName = in.readUTF();
+ if(!EMPTY.equals(singleTimestampEntityClassName)) {
+ try {
+ this.singleTimestampEntityClass = Class.forName(singleTimestampEntityClassName);
+ } catch (ClassNotFoundException e) {
+ if(LOG.isDebugEnabled()) LOG.warn("Class " + singleTimestampEntityClassName + " not found ");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
new file mode 100755
index 0000000..9fb05a3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface NonUniqueIndex {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
new file mode 100755
index 0000000..ff11397
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface NonUniqueIndexes {
+
+ public NonUniqueIndex[] value();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java
new file mode 100644
index 0000000..1b99fcd
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+public class NullObject {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
new file mode 100755
index 0000000..1778788
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+public class NullSerDeser implements EntitySerDeser<NullObject>{
+
+ private static final byte[] EMPTY_NULL_ARRAY = new byte[0];
+
+ @Override
+ public NullObject deserialize(byte[] bytes) {
+ return null;
+ }
+
+ @Override
+ public byte[] serialize(NullObject t) {
+ return EMPTY_NULL_ARRAY;
+ }
+
+ @Override
+ public Class<NullObject> type() {
+ return NullObject.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
new file mode 100644
index 0000000..cb60016
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Partition annotation will impact the rowkey generation for Eagle entities. Once an entity class
+ * has defined the partition fields for an Eagle entity, the hash codes of the defined partition
+ * fields will be placed just after prefix field, and before timestamp field.
+ *
+ *
+ */
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Partition
+{
+ /**
+ * Order in which annotated tags are to be regarded as data partitions.
+ */
+ public String[] value() default { };
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
new file mode 100644
index 0000000..36f404c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Prefix {
+ String value() default "";
+}
\ No newline at end of file