You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:22 UTC

[15/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
new file mode 100755
index 0000000..bed61af
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericMetricShadowEntity;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * This object should be regarded as read-only metadata for an entity as it will be shared across all entity object
+ * with the same entity name, so don't try to set different values for any of the fields, 
+ * otherwise it's not thread safe
+ */
+public class EntityDefinition implements Writable{
+	private final static Logger LOG = LoggerFactory.getLogger(EntityDefinition.class);
+
+	private Class<? extends TaggedLogAPIEntity> entityClass;
+	private String table;
+	private String columnFamily;
+	// TODO prefix be within search/get condition instead of entity definition. Topology entity should have pre-defined prefix. 
+	private String prefix;
+	private String service;
+	private String serviceCreationPath;
+	private String serviceDeletionPath;
+	private String[] partitions;
+	private Map<String, Qualifier> displayNameMap = new HashMap<String, Qualifier>();
+	private Map<String, Qualifier> qualifierNameMap = new HashMap<String, Qualifier>();
+	private Map<String, Method> qualifierGetterMap = new HashMap<String, Method>();
+	private boolean isTimeSeries;
+	private MetricDefinition metricDefinition;
+	private IndexDefinition[] indexes;
+	
+
+	public EntityDefinition(){}
+	
+	public MetricDefinition getMetricDefinition() {
+		return metricDefinition;
+	}
+	public void setMetricDefinition(MetricDefinition metricDefinition) {
+		this.metricDefinition = metricDefinition;
+	}
+	public boolean isTimeSeries() {
+		return isTimeSeries;
+	}
+	public void setTimeSeries(boolean isTimeSeries) {
+		this.isTimeSeries = isTimeSeries;
+	}
+	public String getColumnFamily() {
+		return columnFamily;
+	}
+	public void setColumnFamily(String columnFamily) {
+		this.columnFamily = columnFamily;
+	}
+	public Class<? extends TaggedLogAPIEntity> getEntityClass() {
+		return entityClass;
+	}
+	public void setEntityClass(Class<? extends TaggedLogAPIEntity> entityClass) {
+		this.entityClass = entityClass;
+	}
+	public String getTable() {
+		return table;
+	}
+	public void setTable(String table) {
+		this.table = table;
+	}
+	public Map<String, Qualifier> getDisplayNameMap() {
+		return displayNameMap;
+	}
+	public void setDisplayNameMap(Map<String, Qualifier> displayNameMap) {
+		this.displayNameMap = displayNameMap;
+	}
+	public Map<String, Qualifier> getQualifierNameMap() {
+		return qualifierNameMap;
+	}
+	public void setQualifierNameMap(Map<String, Qualifier> qualifierNameMap) {
+		this.qualifierNameMap = qualifierNameMap;
+	}
+	public String getPrefix() {
+		return prefix;
+	}
+	public void setPrefix(String prefix) {
+		this.prefix = prefix;
+	}
+	public String getService() {
+		return service;
+	}
+	public void setService(String service) {
+		this.service = service;
+	}
+	public String getServiceCreationPath() {
+		return serviceCreationPath;
+	}
+	public void setServiceCreationPath(String serviceCreationPath) {
+		this.serviceCreationPath = serviceCreationPath;
+	}
+	public String getServiceDeletionPath() {
+		return serviceDeletionPath;
+	}
+	public void setServiceDeletionPath(String serviceDeletionPath) {
+		this.serviceDeletionPath = serviceDeletionPath;
+	}
+	public String[] getPartitions() {
+		return partitions;
+	}
+	public void setPartitions(String[] partitions) {
+		this.partitions = partitions;
+	}
+	public IndexDefinition[] getIndexes() {
+		return indexes;
+	}
+	public void setIndexes(IndexDefinition[] indexes) {
+		this.indexes = indexes;
+	}
+	public Map<String, Method> getQualifierGetterMap() {
+		return qualifierGetterMap;
+	}
+	public void setQualifierGetterMap(Map<String, Method> qualifierGetterMap) {
+		this.qualifierGetterMap = qualifierGetterMap;
+	}
+//	public Map<String,String> getQualifierDisplayNameMap(){
+//		Map<String,String> qualifierDisplayNameMap = new HashMap<String, String>();
+//		for(Map.Entry<String,Qualifier> entry: qualifierNameMap.entrySet()){
+//			qualifierDisplayNameMap.put(entry.getKey(),entry.getValue().getDisplayName());
+//		}
+//		return qualifierDisplayNameMap;
+//	}
+	
+	/**
+	 * a filed is a tag when this field is neither in qualifierNameMap nor in displayNameMap
+	 * @param field
+	 * @return
+	 */
+	public boolean isTag(String field){
+		return (qualifierNameMap.get(field) == null && displayNameMap.get(field) == null);
+//		return (qualifierNameMap.get(field) == null);
+	}
+
+	/**
+	 * Check if the specified field is a partition tag field
+	 */
+	public boolean isPartitionTag(String field) {
+		if (partitions == null || (!isTag(field))) {
+			return false;
+		}
+		for (String partition : partitions) {
+			if (partition.equals(field)) {
+				return true;
+			}
+		}
+		return false;
+
+	}
+	
+	public Object getValue(TaggedLogAPIEntity entity, String field) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+		if (!entityClass.equals(entity.getClass())) {
+			if ((entityClass.equals(GenericMetricEntity.class) && entity.getClass().equals(GenericMetricShadowEntity.class))) {
+				GenericMetricShadowEntity e = (GenericMetricShadowEntity)entity;
+				return e.getValue();
+			} else {
+				throw new IllegalArgumentException("Invalid entity type: " + entity.getClass().getSimpleName());
+			}
+		}
+		final Method m = qualifierGetterMap.get(field);
+		if (m == null) {
+			// The field is a tag
+			if (entity.getTags() != null) {
+				return entity.getTags().get(field);
+			}
+		}
+		if (m != null) {
+			return m.invoke(entity);
+		}
+		return null;
+	}
+
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeUTF(entityClass.getName());
+		out.writeUTF(table);
+		out.writeUTF(columnFamily);
+		out.writeUTF(prefix);
+		out.writeUTF(service);
+
+		int partitionsLen = 0;
+		if(partitions != null) partitionsLen =partitions.length;
+		out.writeInt(partitionsLen);
+		for (int i = 0; i < partitionsLen; i++) {
+			out.writeUTF(partitions[i]);
+		}
+
+		int displayNameMapSize = displayNameMap.size();
+		out.writeInt(displayNameMapSize);
+		for(Map.Entry<String,Qualifier> entry: displayNameMap.entrySet()){
+			out.writeUTF(entry.getKey());
+			entry.getValue().write(out);
+		}
+
+		int qualifierNameMapSize = qualifierNameMap.size();
+		out.writeInt(qualifierNameMapSize);
+		for(Map.Entry<String,Qualifier> entry: qualifierNameMap.entrySet()){
+			out.writeUTF(entry.getKey());
+			entry.getValue().write(out);
+		}
+
+		// TODO: write qualifierGetterMap
+		out.writeBoolean(isTimeSeries);
+
+		boolean hasMetricDefinition = metricDefinition != null;
+		out.writeBoolean(hasMetricDefinition);
+		if(hasMetricDefinition) {
+			// write MetricDefinition
+			metricDefinition.write(out);
+		}
+
+		// TODO: write indexes
+	}
+
+
+	public void setEntityDefinition(EntityDefinition ed){
+		this.entityClass = ed.getEntityClass();
+		this.table = ed.getTable();
+		this.columnFamily = ed.getColumnFamily();
+		this.prefix = ed.getPrefix();
+		this.service = ed.getService();
+		this.partitions = ed.getPartitions();
+		this.displayNameMap = ed.getDisplayNameMap();
+		this.qualifierGetterMap = ed.getQualifierGetterMap();
+        this.qualifierNameMap = ed.getQualifierNameMap();
+		this.isTimeSeries = ed.isTimeSeries();
+		this.metricDefinition = ed.metricDefinition;
+		this.indexes = ed.getIndexes();
+	}
+
+	//////////////////////////////////////////////
+	// 	TODO: Cache object for reading in region side
+	//////////////////////////////////////////////
+	//	private final static Map<String,EntityDefinition> _classEntityDefinitionCache = new HashMap<String, EntityDefinition>();
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		String entityClassName = in.readUTF();
+//		EntityDefinition _cached = _classEntityDefinitionCache.get(entityClassName);
+//		if(_cached !=null){
+//			setEntityDefinition(_cached);
+//			LOG.info("Got cached definition for entity: "+entityClassName);
+//			return;
+//		}
+		if(LOG.isDebugEnabled()) LOG.debug("Reading EntityDefinition entity: "+entityClassName);
+		try {
+			entityClass = (Class<? extends TaggedLogAPIEntity>) Class.forName(entityClassName);
+		} catch (Exception e) {
+			// ignore
+		}
+		table = in.readUTF();
+		columnFamily = in.readUTF();
+		prefix = in.readUTF();
+		service = in.readUTF();
+
+		int partitionsLen = in.readInt();
+		partitions = new String[partitionsLen];
+		for (int i = 0; i < partitionsLen; i++) {
+			partitions[i] = in.readUTF();
+		}
+		int displayNameMapSize = in.readInt();
+		for(int i=0;i<displayNameMapSize;i++){
+			String key = in.readUTF();
+			Qualifier value = new Qualifier();
+			value.readFields(in);
+			displayNameMap.put(key,value);
+		}
+		int qualifierNameMapSize = in.readInt();
+		for(int i=0;i<qualifierNameMapSize;i++){
+			String key = in.readUTF();
+			Qualifier value = new Qualifier();
+			value.readFields(in);
+			qualifierNameMap.put(key,value);
+		}
+		// TODO: readFields qualifierGetterMap
+		isTimeSeries = in.readBoolean();
+
+		// readFields MetricDefinition
+		boolean hasMetricDefinition = in.readBoolean();
+		if(hasMetricDefinition) {
+			if(LOG.isDebugEnabled()) LOG.debug("reading metricDefinition");
+			metricDefinition = new MetricDefinition();
+			metricDefinition.readFields(in);
+		}
+		// TODO: readFields indexes
+//		_classEntityDefinitionCache.put(entityClassName,this);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
new file mode 100755
index 0000000..e144e05
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.repo.EntityRepositoryScanner;
+import org.mockito.cglib.beans.BeanGenerator;
+import org.mockito.cglib.core.NamingPolicy;
+import org.mockito.cglib.core.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * static initialization of all registered entities. As of now, dynamic registration is not supported
+ */
+public class EntityDefinitionManager {
+	private static final Logger LOG = LoggerFactory.getLogger(EntityDefinitionManager.class);
+	private static volatile boolean initialized = false;
+	/**
+	 * using concurrent hashmap is due to the fact that entity can be registered any time from any thread
+	 */
+	private static Map<String, EntityDefinition> entityServiceMap = new ConcurrentHashMap<String, EntityDefinition>();
+	private static Map<Class<? extends TaggedLogAPIEntity>, EntityDefinition> classMap = new ConcurrentHashMap<Class<? extends TaggedLogAPIEntity>, EntityDefinition>();
+	private static Map<Class<?>, EntitySerDeser<?>> _serDeserMap = new ConcurrentHashMap<Class<?>, EntitySerDeser<?>>(); 
+	private static Map<Class<?>, Integer> _serDeserClassIDMap = new ConcurrentHashMap<Class<?>, Integer>(); 
+	private static Map<Integer, Class<?>> _serIDDeserClassMap = new ConcurrentHashMap<Integer, Class<?>>(); 
+	private static Map<String, Map<Integer, EntityDefinition>> entityPrefixMap = new ConcurrentHashMap<String, Map<Integer, EntityDefinition>>();
+	private static Map<String, Map<Integer, IndexDefinition>> indexPrefixMap = new ConcurrentHashMap<String, Map<Integer, IndexDefinition>>();
+
+	static{
+		int id = 0;
+		_serDeserMap.put(NullObject.class, new NullSerDeser());
+		_serIDDeserClassMap.put(id, NullObject.class);
+		_serDeserClassIDMap.put(NullObject.class, id++);
+		
+		_serDeserMap.put(String.class, new StringSerDeser());
+		_serIDDeserClassMap.put(id, String.class);
+		_serDeserClassIDMap.put(String.class, id++);
+		
+		_serDeserMap.put(long.class, new LongSerDeser());
+		_serIDDeserClassMap.put(id, long.class);
+		_serDeserClassIDMap.put(long.class, id++);
+		
+		_serDeserMap.put(Long.class, new LongSerDeser());
+		_serIDDeserClassMap.put(id, Long.class);
+		_serDeserClassIDMap.put(Long.class, id++);
+		
+		_serDeserMap.put(int.class, new IntSerDeser());
+		_serIDDeserClassMap.put(id, int.class);
+		_serDeserClassIDMap.put(int.class, id++);
+		
+		_serDeserMap.put(Integer.class, new IntSerDeser());
+		_serIDDeserClassMap.put(id, Integer.class);
+		_serDeserClassIDMap.put(Integer.class, id++);
+		
+		_serDeserMap.put(Double.class, new DoubleSerDeser());
+		_serIDDeserClassMap.put(id, Double.class);
+		_serDeserClassIDMap.put(Double.class, id++);
+		
+		_serDeserMap.put(double.class, new DoubleSerDeser());
+		_serIDDeserClassMap.put(id, double.class);
+		_serDeserClassIDMap.put(double.class, id++);
+		
+		_serDeserMap.put(int[].class, new IntArraySerDeser());
+		_serIDDeserClassMap.put(id, int[].class);
+		_serDeserClassIDMap.put(int[].class, id++);
+		
+		_serDeserMap.put(double[].class, new DoubleArraySerDeser());
+		_serIDDeserClassMap.put(id, double[].class);
+		_serDeserClassIDMap.put(double[].class, id++);
+
+		_serDeserMap.put(double[][].class, new Double2DArraySerDeser());
+		_serIDDeserClassMap.put(id, double[][].class);
+		_serDeserClassIDMap.put(double[][].class, id++);
+		
+		_serDeserMap.put(Boolean.class, new BooleanSerDeser());
+		_serIDDeserClassMap.put(id, Boolean.class);
+		_serDeserClassIDMap.put(Boolean.class, id++);
+		
+		_serDeserMap.put(boolean.class, new BooleanSerDeser());
+		_serIDDeserClassMap.put(id, boolean.class);
+		_serDeserClassIDMap.put(boolean.class, id++);
+		
+		_serDeserMap.put(String[].class, new StringArraySerDeser());
+		_serIDDeserClassMap.put(id, String[].class);
+		_serDeserClassIDMap.put(String[].class, id++);
+		
+		_serDeserMap.put(Map.class, new MapSerDeser());
+		_serIDDeserClassMap.put(id, Map.class);
+		_serDeserClassIDMap.put(Map.class, id++);
+		
+		_serDeserMap.put(List.class, new ListSerDeser());
+		_serIDDeserClassMap.put(id, List.class);
+		_serDeserClassIDMap.put(List.class, id++);
+	}
+	
+	
+
+	@SuppressWarnings("rawtypes")
+	public static EntitySerDeser getSerDeser(Class<?> clazz){
+		return _serDeserMap.get(clazz);
+	}
+
+	/**
+	 * Get internal ID by the predefined registered class
+	 * @param clazz original for serialization/deserialization 
+	 * @return the internal id if the input class has been registered, otherwise return -1
+	 */
+	public static int getIDBySerDerClass(Class<?> clazz) {
+		final Integer id = _serDeserClassIDMap.get(clazz);
+		if (id == null) {
+			return -1;
+		}
+		return id;
+	}
+	
+
+	/**
+	 * Get the predefined registered class by internal ID
+	 * @param id the internal class ID
+	 * @return the predefined registered class, if the class hasn't been registered, return null
+	 */
+	public static Class<?> getClassByID(int id) {
+		return _serIDDeserClassMap.get(id);
+	}
+	
+	/**
+	 * it is allowed that user can register their own entity
+	 * @param clazz entity class
+	 * @throws IllegalArgumentException
+	 */
+	public static void registerEntity(Class<? extends TaggedLogAPIEntity> clazz) throws IllegalArgumentException{
+		registerEntity(createEntityDefinition(clazz));
+	}
+	
+	/**
+	 * it is allowed that user can register their own entity
+	 * @deprecated This API is deprecated since we need to use Service annotation to define service name for entities
+	 * @param serviceName entity service name
+	 * @param clazz entity class
+	 * @throws IllegalArgumentException
+	 * 
+	 */
+    @Deprecated
+	public static void registerEntity(String serviceName, Class<? extends TaggedLogAPIEntity> clazz) throws IllegalArgumentException{
+		registerEntity(serviceName, createEntityDefinition(clazz));
+	}
+	
+	/**
+	 * it is allowed that user can register their own entity definition
+	 * @param entityDef entity definition
+	 * @throws IllegalArgumentException
+	 */
+	public static void registerEntity(EntityDefinition entityDef) {
+		registerEntity(entityDef.getService(), entityDef);
+	}
+	
+	/**
+	 * it is allowed that user can register their own entity definition
+	 * @deprecated This API is deprecated since we need to use Service annotation to define service name for entities. 
+	 * 
+	 * @param entityDef entity definition
+	 * @throws IllegalArgumentException
+	 */
+	public static void registerEntity(String serviceName, EntityDefinition entityDef) {
+		final String table = entityDef.getTable();
+		if (entityServiceMap.containsKey(serviceName)) {
+			final EntityDefinition existing = entityServiceMap.get(serviceName);
+			if (entityDef.getClass().equals(existing.getClass())) {
+				return;
+			}
+			throw new IllegalArgumentException("Service " + serviceName + " has already been registered by " + existing.getClass().getName() + ", so class " + entityDef.getClass() + " can NOT be registered");
+		}
+		synchronized (EntityDefinitionManager.class) {
+			checkPrefix(entityDef);
+			entityServiceMap.put(serviceName, entityDef);
+			Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(table);
+			if (entityHashMap == null) {
+				entityHashMap = new ConcurrentHashMap<Integer, EntityDefinition>();
+				entityPrefixMap.put(table, entityHashMap);
+			}
+			entityHashMap.put(entityDef.getPrefix().hashCode(), entityDef);
+			final IndexDefinition[] indexes = entityDef.getIndexes();
+			if (indexes != null) {
+				for (IndexDefinition index : indexes) {
+					Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(table);
+					if (indexHashMap == null) {
+						indexHashMap = new ConcurrentHashMap<Integer, IndexDefinition>();
+						indexPrefixMap.put(table, indexHashMap);
+					}
+					indexHashMap.put(index.getIndexPrefix().hashCode(), index);
+				}
+			}
+			classMap.put(entityDef.getEntityClass(), entityDef);
+		}
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(entityDef.getEntityClass().getSimpleName() + " entity registered successfully, table name: " + entityDef.getTable() +
+					", prefix: " + entityDef.getPrefix() + ", service: " + serviceName + ", CF: " + entityDef.getColumnFamily());
+        }else{
+            LOG.info(String.format("Registered %s (%s)", entityDef.getEntityClass().getSimpleName(), serviceName));
+        }
+	}
+
+	private static void checkPrefix(EntityDefinition entityDef) {
+		final Integer entityPrefixHashcode = entityDef.getPrefix().hashCode();
+		if (entityPrefixMap.containsKey(entityDef.getTable())) {
+			final Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(entityDef.getTable());
+			if (entityHashMap.containsKey(entityPrefixHashcode) && (!entityDef.equals(entityHashMap.get(entityPrefixHashcode)))) {
+				throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The entity prefix " + entityDef.getPrefix() + " has already been registered by entity service " + entityHashMap.get(entityPrefixHashcode).getService());
+			}
+			final IndexDefinition[] indexes = entityDef.getIndexes();
+			if (indexes != null) {
+				for (IndexDefinition index : indexes) {
+					final Integer indexPrefixHashcode = index.getIndexPrefix().hashCode();
+					if (entityHashMap.containsKey(indexPrefixHashcode)) {
+						throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The index prefix " + index.getIndexPrefix() + " has already been registered by entity " + entityHashMap.get(indexPrefixHashcode).getService());
+					}
+					final Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(entityDef.getTable());
+					if (indexHashMap != null && indexHashMap.containsKey(indexPrefixHashcode) && (!index.equals(indexHashMap.get(indexPrefixHashcode)))) {
+						throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The index prefix " + index.getIndexPrefix() + " has already been registered by entity " + indexHashMap.get(indexPrefixHashcode).getEntityDefinition().getService());
+					}
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Get entity definition by name
+	 * @param serviceName
+	 * @return
+	 * @throws IllegalAccessException 
+	 * @throws InstantiationException 
+	 */
+	public static EntityDefinition getEntityByServiceName(String serviceName) throws InstantiationException, IllegalAccessException{
+		checkInit();
+		return entityServiceMap.get(serviceName);
+	}
+	
+	public static EntityDefinition getEntityDefinitionByEntityClass(Class<? extends TaggedLogAPIEntity> clazz) throws InstantiationException, IllegalAccessException {
+		checkInit();
+		return classMap.get(clazz);
+	}
+
+	private static void checkInit() throws InstantiationException, IllegalAccessException {
+		if (!initialized) {
+			synchronized (EntityDefinitionManager.class) {
+				if (!initialized) {
+					EntityRepositoryScanner.scan();
+					initialized = true;
+				}
+			}
+		}
+	}
+
+	/**
+	 * User can register their own field SerDeser
+	 * @param clazz class of the the SerDeser 
+	 * @param entitySerDeser entity or field SerDeser
+	 * @throws IllegalArgumentException
+	 */
+	public static void registerSerDeser(Class<?> clazz, EntitySerDeser<?> entitySerDeser) {
+		_serDeserMap.put(clazz, entitySerDeser);
+	}
+
+	/**
+	 * Check whether the entity class is time series, false by default
+	 * @param clazz
+	 * @return
+	 */
+	public static boolean isTimeSeries(Class<? extends TaggedLogAPIEntity> clazz){
+		TimeSeries ts = clazz.getAnnotation(TimeSeries.class);
+		return ts != null && ts.value();
+	}
+
+	@SuppressWarnings("unchecked")
+	public static EntityDefinition createEntityDefinition(Class<? extends TaggedLogAPIEntity> cls) {
+		
+		final EntityDefinition ed = new EntityDefinition();
+
+		ed.setEntityClass(cls);
+		// parse cls' annotations
+		Table table = cls.getAnnotation(Table.class);
+		if(table == null || table.value().isEmpty()){
+			throw new IllegalArgumentException("Entity class must have a non-empty table name annotated with @Table");
+		}
+		String tableName = table.value();
+		if(EagleConfigFactory.load().isTableNamePrefixedWithEnvironment()){
+			tableName = EagleConfigFactory.load().getEnv() + "_" + tableName;
+		}
+		ed.setTable(tableName);
+		
+		ColumnFamily family = cls.getAnnotation(ColumnFamily.class);
+		if(family == null || family.value().isEmpty()){
+			throw new IllegalArgumentException("Entity class must have a non-empty column family name annotated with @ColumnFamily");
+		}
+		ed.setColumnFamily(family.value());
+		
+		Prefix prefix = cls.getAnnotation(Prefix.class);
+		if(prefix == null || prefix.value().isEmpty()){
+			throw new IllegalArgumentException("Entity class must have a non-empty prefix name annotated with @Prefix");
+		}
+		ed.setPrefix(prefix.value());
+		
+		TimeSeries ts = cls.getAnnotation(TimeSeries.class);
+		if(ts == null){
+			throw new IllegalArgumentException("Entity class must have a non-empty timeseries name annotated with @TimeSeries");
+		}
+		ed.setTimeSeries(ts.value());
+		
+		Service service = cls.getAnnotation(Service.class);
+		if(service == null || service.value().isEmpty()){
+			ed.setService(cls.getSimpleName());
+		} else {
+			ed.setService(service.value());
+		}
+
+		Metric m = cls.getAnnotation(Metric.class);
+		Map<String, Class<?>> dynamicFieldTypes = new HashMap<String, Class<?>>();
+		if(m != null){
+			// metric has to be timeseries
+			if(!ts.value()){
+				throw new IllegalArgumentException("Metric entity must be time series as well");
+			}
+			MetricDefinition md = new MetricDefinition();
+			md.setInterval(m.interval());
+			ed.setMetricDefinition(md);
+		}
+
+		java.lang.reflect.Field[] fields = cls.getDeclaredFields();
+		for(java.lang.reflect.Field f : fields){
+			Column column = f.getAnnotation(Column.class); 
+			if(column == null || column.value().isEmpty()){
+				continue;
+			}
+			Class<?> fldCls = f.getType();
+			// intrusive check field type for metric entity
+			checkFieldTypeForMetric(ed.getMetricDefinition(), f.getName(), fldCls, dynamicFieldTypes);
+			Qualifier q = new Qualifier();
+			q.setDisplayName(f.getName());
+			q.setQualifierName(column.value());
+			EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls); 
+			if(serDeser == null){
+				throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() + 
+						" of entity " + cls.getSimpleName() + " has no serializer associated ");
+			} else {
+				q.setSerDeser((EntitySerDeser<Object>)serDeser);
+			}
+			ed.getQualifierNameMap().put(q.getQualifierName(), q);
+			ed.getDisplayNameMap().put(q.getDisplayName(), q);
+			// TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() according to org.apache.commons.beanutils.PropertyUtils
+			final String propertyName = f.getName().substring(0,1).toUpperCase() + f.getName().substring(1);
+			String getterName = "get" + propertyName;
+			try {
+				Method method = cls.getMethod(getterName);
+				ed.getQualifierGetterMap().put(f.getName(), method);
+			} catch (Exception e) {
+				// Check if the type is boolean
+				getterName = "is" + propertyName;
+				try {
+					Method method = cls.getMethod(getterName);
+					ed.getQualifierGetterMap().put(f.getName(), method);
+				} catch (Exception e1) {
+					throw new IllegalArgumentException("Field " + f.getName() + " hasn't defined valid getter method: " + getterName, e);
+				}
+			}
+			if(LOG.isDebugEnabled()) LOG.debug("Field registered " + q);
+		}
+
+		// TODO: Lazy create because not used at all
+		// dynamically create bean class
+		if(ed.getMetricDefinition() != null){
+			Class<?> metricCls = createDynamicClassForMetric(cls.getName()+"_SingleTimestamp", dynamicFieldTypes);
+			ed.getMetricDefinition().setSingleTimestampEntityClass(metricCls);
+		}
+		
+		final Partition partition = cls.getAnnotation(Partition.class);
+		if (partition != null) {
+			final String[] partitions = partition.value();
+			ed.setPartitions(partitions);
+			// Check if partition fields are all tag fields. Partition field can't be column field, must be tag field.
+			for (String part : partitions) {
+				if (!ed.isTag(part)) {
+					throw new IllegalArgumentException("Partition field can't be column field, must be tag field. "
+							+ "Partition name: " + part);
+				}
+			}
+		}
+		
+		final Indexes indexes = cls.getAnnotation(Indexes.class);
+		if (indexes != null) {
+			final Index[] inds = indexes.value();
+			final IndexDefinition[] indexDefinitions = new IndexDefinition[inds.length];
+			for (int i = 0; i < inds.length; ++i) {
+				final Index ind = inds[i];
+				indexDefinitions[i] = new IndexDefinition(ed, ind);
+			}
+			ed.setIndexes(indexDefinitions);
+		}
+		
+		final ServicePath path = cls.getAnnotation(ServicePath.class);
+		if (path != null) {
+			if (path.path() != null && (!path.path().isEmpty())) {
+				ed.setServiceCreationPath(path.path());
+			}
+		}
+		
+		return ed;
+	}
+	
+	private static void checkFieldTypeForMetric(MetricDefinition md, String fieldName, Object fldCls, Map<String, Class<?>> dynamicFieldTypes){
+		if(md != null){
+			if(fldCls.equals(int[].class)){
+				dynamicFieldTypes.put(fieldName, int.class);
+				return;
+			}else if(fldCls.equals(long[].class)){
+				dynamicFieldTypes.put(fieldName, long.class);
+				return;
+			}else if(fldCls.equals(double[].class)){
+				dynamicFieldTypes.put(fieldName, double.class);
+				return;
+			}
+			throw new IllegalArgumentException("Fields for metric entity must be one of int[], long[] or double[]");
+		}
+	}
+	
+	private static Class<?> createDynamicClassForMetric(final String className, Map<String, Class<?>> dynamicFieldTypes){
+		BeanGenerator beanGenerator = new BeanGenerator();
+		beanGenerator.setNamingPolicy(new NamingPolicy(){
+	        @Override 
+	        public String getClassName(String prefix,String source, Object key, Predicate names){
+	            return className;
+	        }});
+	    BeanGenerator.addProperties(beanGenerator, dynamicFieldTypes);
+	    beanGenerator.setSuperclass(TaggedLogAPIEntity.class);
+	    return (Class<?>) beanGenerator.createClass();
+	}
+	
+	public static Map<String, EntityDefinition> entities() throws Exception{
+		checkInit();
+		return entityServiceMap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
new file mode 100755
index 0000000..25d55e0
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+public interface EntitySerDeser<T> {
+	public T deserialize(byte[] bytes);
+	public byte[] serialize(T t);
+	public Class<T> type();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
new file mode 100755
index 0000000..a7ec4e4
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.beans.PropertyDescriptor;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitySerDeserializer {
+	private static final Logger LOG = LoggerFactory.getLogger(EntitySerDeserializer.class);
+	
+	// TODO throws seperate exceptions
+	@SuppressWarnings("unchecked")
+	public <T> T readValue(Map<String, byte[]> qualifierValues, EntityDefinition ed) throws Exception{
+		Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass();
+		if(clazz == null){
+			throw new NullPointerException("Entity class of service "+ed.getService()+" is null");
+		}
+		TaggedLogAPIEntity obj = clazz.newInstance();
+		Map<String, Qualifier> map = ed.getQualifierNameMap();
+		for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){
+			Qualifier q = map.get(entry.getKey());
+			if(q == null){
+				// if it's not pre-defined qualifier, it must be tag unless it's a bug
+				if(obj.getTags() == null){
+					obj.setTags(new HashMap<String, String>());
+				}
+				obj.getTags().put(entry.getKey(), new StringSerDeser().deserialize(entry.getValue()));
+				continue;
+			}
+			
+			// TODO performance loss compared with new operator
+			// parse different types of qualifiers
+			String fieldName = q.getDisplayName();
+			PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, fieldName);
+			if(entry.getValue() != null){
+				Object args = q.getSerDeser().deserialize(entry.getValue());
+				pd.getWriteMethod().invoke(obj, args);
+//				if (logger.isDebugEnabled()) {
+//					logger.debug(entry.getKey() + ":" + args + " is deserialized");
+//				}
+			}
+		}
+		return (T)obj;
+	}
+	
+	public Map<String, byte[]> writeValue(TaggedLogAPIEntity entity, EntityDefinition ed) throws Exception{
+		Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>();
+		// iterate all modified qualifiers
+		for(String fieldName : entity.modifiedQualifiers()){
+			PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, fieldName);
+			Object obj = pd.getReadMethod().invoke(entity);
+			Qualifier q = ed.getDisplayNameMap().get(fieldName);
+			EntitySerDeser<Object> ser = q.getSerDeser();
+			byte[] value = ser.serialize(obj);
+			qualifierValues.put(q.getQualifierName(), value);
+		}
+		return qualifierValues;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
new file mode 100755
index 0000000..c7dc113
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Index {
+
+    public String name();
+    public String[] columns();
+    public boolean unique();
+//	boolean unique() default true;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
new file mode 100755
index 0000000..2e62420
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.query.parser.ANDExpression;
+import org.apache.eagle.query.parser.AtomicExpression;
+import org.apache.eagle.query.parser.ComparisonOperator;
+import org.apache.eagle.query.parser.ORExpression;
+import org.apache.eagle.common.ByteUtil;
+
+/**
+ * Eagle index schema definition.
+ * 
+ * 1. Index schema can be defined in entity class by annotation.
+ * 2. One index schema can contain multiple fields/tags, defined in order
+ * 3. We only support immutable indexing for now
+ * 4. When entity is created or deleted, the corresponding index entity should be created or deleted at the same time
+ * 5. Index transparency to queries. Queries go through index when and only when index can serve all search conditions after query rewrite
+ * 
+ *
+ */
+public class IndexDefinition {
+	
+	public enum IndexType {
+		UNIQUE_INDEX,
+		NON_CLUSTER_INDEX,
+		NON_INDEX
+	}
+
+	private final EntityDefinition entityDef;
+	private final Index index;
+	private final IndexColumn[] columns;
+	private final String indexPrefix;
+	
+	private static final byte[] EMPTY_VALUE = new byte[0];
+	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+	public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
+	public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535;
+	
+	private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$";
+	private static final Pattern FIELD_NAME_PATTERN = Pattern.compile(FIELD_NAME_PATTERN_STRING);
+	private final static Logger LOG = LoggerFactory.getLogger(IndexDefinition.class);
+
+	public IndexDefinition(EntityDefinition entityDef, Index index) {
+		this.entityDef = entityDef;
+		this.index = index;
+		this.indexPrefix = entityDef.getPrefix() + "_" + index.name();
+		final String[] indexColumns = index.columns();
+		this.columns = new IndexColumn[indexColumns.length];
+		for (int i = 0; i < indexColumns.length; ++i) {
+			final String name = indexColumns[i];
+			final boolean isTag = entityDef.isTag(name);
+			final Qualifier qualifier = isTag ? null : entityDef.getDisplayNameMap().get(name);
+			columns[i] = new IndexColumn(name, isTag, qualifier);
+		}
+		LOG.info("Created index " + index.name() + " for " + entityDef.getEntityClass().getSimpleName());
+	}
+
+	public EntityDefinition getEntityDefinition() {
+		return entityDef;
+	}
+	
+	public Index getIndex() {
+		return index;
+	}
+	
+	public String getIndexName() {
+		return index.name();
+	}
+	
+	public IndexColumn[] getIndexColumns() {
+		return columns;
+	}
+	
+	public String getIndexPrefix() {
+		return indexPrefix;
+	}
+	
+	public boolean isUnique() {
+		return index.unique();
+	}
+	
+	/**
+	 * Check if the query is suitable to go through index. If true, then return the value of index fields in order. Otherwise return null.
+	 * TODO: currently index fields should be string type.
+	 * 
+	 * @param query query expression after re-write
+	 * @param rowkeys if the query can go through the index, all rowkeys will be added into rowkeys.
+	 * @return true if the query can go through the index, otherwise return false
+	 */
+	public IndexType canGoThroughIndex(ORExpression query, List<byte[]> rowkeys) {
+		if (query == null || query.getANDExprList() == null || query.getANDExprList().isEmpty()) 
+			return IndexType.NON_CLUSTER_INDEX;
+		if (rowkeys != null) {
+			rowkeys.clear();
+		}
+		final Map<String, String> indexfieldMap = new HashMap<String, String>();
+		for(ANDExpression andExpr : query.getANDExprList()) {
+			indexfieldMap.clear();
+			for(AtomicExpression ae : andExpr.getAtomicExprList()) {
+				// TODO temporarily ignore those fields which are not for attributes
+				final String fieldName = parseEntityAttribute(ae.getKey());
+				if(fieldName != null && ComparisonOperator.EQUAL.equals(ae.getOp())){
+					indexfieldMap.put(fieldName, ae.getValue());
+				}
+			}
+			final String[] partitions = entityDef.getPartitions();
+			int[] partitionValueHashs = null;
+			if (partitions != null) {
+				partitionValueHashs = new int[partitions.length];
+				for (int i = 0; i < partitions.length; ++i) {
+					final String value = indexfieldMap.get(partitions[i]);
+					if (value == null) {
+						throw new IllegalArgumentException("Partition " + partitions[i] + " is not defined in the query: " + query.toString());
+					}
+					partitionValueHashs[i] = value.hashCode();
+				}
+			}
+			final byte[][] indexFieldValues = new byte[columns.length][];
+			for (int i = 0; i < columns.length; ++i) {
+				final IndexColumn col = columns[i];
+				if (!indexfieldMap.containsKey(col.getColumnName())) {
+					// If we have to use scan anyway, there's no need to go through index
+					return IndexType.NON_INDEX;
+				}
+				final String value = indexfieldMap.get(col.getColumnName());
+				indexFieldValues[i] = value.getBytes();
+			}
+			final byte[] rowkey = generateUniqueIndexRowkey(indexFieldValues, partitionValueHashs, null);
+			if (rowkeys != null) {
+				rowkeys.add(rowkey);
+			}
+		}
+		if (index.unique()) {
+			return IndexType.UNIQUE_INDEX;
+		}
+		return IndexType.NON_CLUSTER_INDEX;
+	}
+
+	private String parseEntityAttribute(String fieldName) {
+		Matcher m = FIELD_NAME_PATTERN.matcher(fieldName);
+		if(m.find()){
+			return m.group(1);
+		}
+		return null;
+	}
+	
+	// TODO: We should move index rowkey generation later since this class is for general purpose, not only for hbase.
+	public byte[] generateIndexRowkey(TaggedLogAPIEntity entity) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+		if (entity.getClass() != entityDef.getEntityClass()) {
+			throw new IllegalArgumentException("Expected entity class: " + entityDef.getEntityClass().getName() + ", but got class " + entity.getClass().getName());
+		}
+		final byte[][] indexValues = generateIndexValues(entity);
+		final int[] partitionHashCodes = generatePartitionHashCodes(entity);
+		SortedMap<Integer, Integer> tagMap = null;
+		if (!index.unique()) {
+			// non cluster index
+			tagMap = RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags());
+		}
+		
+		return generateUniqueIndexRowkey(indexValues, partitionHashCodes, tagMap);
+	}
+	
+	private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] partitionHashCodes, SortedMap<Integer, Integer> tagMap) {
+		final int prefixHashCode = indexPrefix.hashCode();
+		int totalLength = 4;
+		totalLength += (partitionHashCodes != null) ? (4 * partitionHashCodes.length) : 0;
+		
+		totalLength += (2 * indexValues.length);
+		for (int i = 0; i < indexValues.length; ++i) {
+			final byte[] value = indexValues[i];
+			totalLength += value.length;
+		}
+		if (tagMap != null && (!tagMap.isEmpty())) {
+			totalLength += tagMap.size() * 8;
+		}
+		
+		int offset = 0;
+		final byte[] rowkey = new byte[totalLength];
+		
+		// 1. set prefix
+		ByteUtil.intToBytes(prefixHashCode, rowkey, offset);
+		offset += 4;
+		
+		// 2. set partition
+		if (partitionHashCodes != null) {
+			for (Integer partitionHashCode : partitionHashCodes) {
+				ByteUtil.intToBytes(partitionHashCode, rowkey, offset);
+				offset += 4;
+			}
+		}
+		
+		// 3. set index values
+		for (int i = 0; i < columns.length; ++i) {
+			ByteUtil.shortToBytes((short)indexValues[i].length, rowkey, offset);
+			offset += 2;
+			for (int j = 0; j < indexValues[i].length; ++j) {
+				rowkey[offset++] = indexValues[i][j];
+			}
+		}
+		
+		// Check if it's non clustered index, then set the tag/value hash code
+		if (tagMap != null && (!tagMap.isEmpty())) {
+			// 4. set tag key/value hashes
+			for (Map.Entry<Integer, Integer> entry : tagMap.entrySet()) {
+				ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
+				offset += 4;
+				ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
+				offset += 4;
+			}
+		}
+		
+		return rowkey;
+	}
+
+	private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) {
+		final String[] partitions = entityDef.getPartitions();
+		int[] result = null;
+		if (partitions != null) {
+			result = new int[partitions.length];
+			final Map<String, String> tags = entity.getTags();
+			for (int i = 0 ; i < partitions.length; ++i) {
+				final String partition = partitions[i];
+				final String tagValue = tags.get(partition);
+				if (tagValue != null) {
+					result[i] = tagValue.hashCode();
+				} else {
+					result[i] = EMPTY_PARTITION_DEFAULT_HASH_CODE;
+				}
+			}
+		}
+		return result;
+	}
+
+	private byte[][] generateIndexValues(TaggedLogAPIEntity entity) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+
+		final byte[][] result = new byte[columns.length][];
+		for (int i = 0; i < columns.length; ++i) {
+			final IndexColumn column = columns[i];
+			final String columnName = column.getColumnName();
+			if (column.isTag) {
+				final Map<String, String> tags = entity.getTags();
+				if (tags == null || tags.get(columnName) == null) {
+					result[i] = EMPTY_VALUE;
+				} else {
+					result[i] = tags.get(columnName).getBytes(UTF_8_CHARSET);
+				}
+			} else {
+				PropertyDescriptor pd = column.getPropertyDescriptor();
+				if (pd == null) {
+					pd = PropertyUtils.getPropertyDescriptor(entity, columnName);
+					column.setPropertyDescriptor(pd);
+				}
+				final Object value = pd.getReadMethod().invoke(entity);
+				if (value == null) {
+					result[i] = EMPTY_VALUE;
+				} else {
+					final Qualifier q = column.getQualifier();
+					result[i] = q.getSerDeser().serialize(value);
+				}
+			}
+			if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) {
+				throw new IllegalArgumentException("Index field value exceeded the max length: " + MAX_INDEX_VALUE_BYTE_LENGTH + ", actual length: " + result[i].length);
+			}
+		}
+		return result;
+	}
+	
+	/**
+	 * Index column definition class
+	 *
+	 */
+	public static class IndexColumn {
+		private final String columnName;
+		private final boolean isTag;
+		private final Qualifier qualifier;
+		private PropertyDescriptor propertyDescriptor;
+		
+		public IndexColumn(String columnName, boolean isTag, Qualifier qualifier) {
+			this.columnName = columnName;
+			this.isTag = isTag;
+			this.qualifier = qualifier;
+		}
+		
+		public String getColumnName() {
+			return columnName;
+		}
+		public boolean isTag() {
+			return isTag;
+		}
+		
+		public Qualifier getQualifier() {
+			return qualifier;
+		}
+
+		public PropertyDescriptor getPropertyDescriptor() {
+			return propertyDescriptor;
+		}
+
+		public void setPropertyDescriptor(PropertyDescriptor propertyDescriptor) {
+			this.propertyDescriptor = propertyDescriptor;
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
new file mode 100644
index 0000000..3c82a0a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Indexes {
+
+	public Index[] value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
new file mode 100755
index 0000000..8831223
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+/**
+ * serialize int array which is stored like the following
+ * <int><int>*size, where the first <int> is the size of int
+ */
+public class IntArraySerDeser implements EntitySerDeser<int[]>{
+
+	public IntArraySerDeser(){}
+
+	@Override
+	public int[] deserialize(byte[] bytes){
+		if(bytes.length < 4)
+			return null;
+		int offset = 0;
+		// get size of int array
+		int size = ByteUtil.bytesToInt(bytes, offset);
+		offset += 4;
+		int[] values = new int[size];
+		for(int i=0; i<size; i++){
+			values[i] = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+		}
+		return values;
+	}
+	
+	/**
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	@Override
+	public byte[] serialize(int[] obj){
+		if(obj == null)
+			return null;
+		int size = obj.length;
+		byte[] array = new byte[4 + 4*size];
+		byte[] first = ByteUtil.intToBytes(size);
+		int offset = 0;
+		System.arraycopy(first, 0, array, offset, first.length);
+		offset += first.length;
+		for(int i=0; i<size; i++){
+			System.arraycopy(ByteUtil.intToBytes(obj[i]), 0, array, offset, 4);
+			offset += 4;
+		}
+		return array;
+	}
+
+	@Override
+	public Class<int[]> type() {
+		return int[].class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
new file mode 100755
index 0000000..695badd
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class IntSerDeser implements EntitySerDeser<Integer>{
+	public IntSerDeser(){}
+
+	@Override
+	public Integer deserialize(byte[] bytes){
+		if(bytes.length < 4)
+			return null;
+		return Integer.valueOf(ByteUtil.bytesToInt(bytes));
+	}
+	
+	@Override
+	public byte[] serialize(Integer obj){
+		if(obj == null)
+			return null;
+		return ByteUtil.intToBytes(obj);
+	}
+
+	@Override
+	public Class<Integer> type() {
+		return Integer.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
new file mode 100644
index 0000000..eaf5e92
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.common.ByteUtil;
+
+/**
+ * Serialization/deserialization for map type
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class ListSerDeser implements EntitySerDeser<List> {
+
+	@SuppressWarnings({ "unchecked" })
+	@Override
+	public List deserialize(byte[] bytes) {
+		if (bytes == null || bytes.length == 0) {
+			return null;
+		}
+		final List list = new ArrayList();
+		int offset = 0;
+		// get size of int array
+		final int size = ByteUtil.bytesToInt(bytes, offset);
+		offset += 4;
+		
+		for (int i = 0; i < size; ++i) {
+			final int valueID = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID);
+			if (valueClass == null) {
+				throw new IllegalArgumentException("Unsupported value type ID: " + valueID);
+			}
+			final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+			final int valueLength = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			final byte[] valueContent = new byte[valueLength];
+			System.arraycopy(bytes, offset, valueContent, 0, valueLength);
+			offset += valueLength;
+			final Object value = valueSerDer.deserialize(valueContent);
+			
+			list.add(value);
+		}
+		return list;
+	}
+
+	/**
+	 *  size + value1 type id + value length + value1 binary content + ...
+	 *   4B         4B              4B              value1 bytes
+	 */
+	@SuppressWarnings({ "unchecked" })
+	@Override
+	public byte[] serialize(List list) {
+		if(list == null)
+			return null;
+		final int size = list.size();
+		final int[] valueIDs = new int[size];
+		final byte[][] valueBytes = new byte[size][];
+		
+		int totalSize = 4 + size * 8;
+		int i = 0;
+		Iterator iter = list.iterator();
+		while (iter.hasNext()) {
+			final Object value = iter.next();
+			Class<?> valueClass = value.getClass();
+			int valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+
+			if (valueTypeID == -1) {
+				if (value instanceof List) {
+					valueClass = List.class;
+					valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+				}
+				else if (value instanceof Map) {
+					valueClass = Map.class;
+					valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+				}
+				else {
+					throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+				}
+			}
+			valueIDs[i] = valueTypeID;
+			final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+			if (valueSerDer == null) {
+				throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+			}
+			valueBytes[i] = valueSerDer.serialize(value);
+			totalSize += valueBytes[i].length;
+			++i;
+		}
+		final byte[] result = new byte[totalSize];
+		int offset = 0;
+		ByteUtil.intToBytes(size, result, offset);
+		offset += 4;
+		for (i = 0; i < size; ++i) {			
+			ByteUtil.intToBytes(valueIDs[i], result, offset);
+			offset += 4;
+			ByteUtil.intToBytes(valueBytes[i].length, result, offset);
+			offset += 4;
+			System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length);
+			offset += valueBytes[i].length;
+		}
+		return result;
+	}
+
+	@Override
+	public Class<List> type() {
+		return List.class;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
new file mode 100755
index 0000000..914cd95
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class LongSerDeser implements EntitySerDeser<Long>{
+	public LongSerDeser(){}
+
+	@Override
+	public Long deserialize(byte[] bytes){
+		if(bytes.length < 8)
+			return null;
+//		return new Long(ByteUtil.bytesToLong(bytes));
+		return Long.valueOf(ByteUtil.bytesToLong(bytes));
+	}
+	
+	@Override
+	public byte[] serialize(Long obj){
+		if(obj == null)
+			return null;
+		return ByteUtil.longToBytes(obj);
+	}
+
+	@Override
+	public Class<Long> type() {
+		return Long.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
new file mode 100755
index 0000000..d16fe3a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Serialization/deserialization for map type
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class MapSerDeser implements EntitySerDeser<Map> {
+
+	@SuppressWarnings({ "unchecked" })
+	@Override
+	public Map deserialize(byte[] bytes) {
+		if (bytes == null || bytes.length == 0) {
+			return null;
+		}
+		final Map map = new TreeMap();
+		int offset = 0;
+		// get size of int array
+		final int size = ByteUtil.bytesToInt(bytes, offset);
+		offset += 4;
+		
+		for (int i = 0; i < size; ++i) {
+			final int keyID = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			final Class<?> keyClass = EntityDefinitionManager.getClassByID(keyID);
+			if (keyClass == null) {
+				throw new IllegalArgumentException("Unsupported key type ID: " + keyID);
+			}
+			final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass);
+			final int keyLength = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			final byte[] keyContent = new byte[keyLength];
+			System.arraycopy(bytes, offset, keyContent, 0, keyLength);
+			offset += keyLength;
+			final Object key = keySerDer.deserialize(keyContent);
+			
+			final int valueID = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID);
+			if (valueClass == null) {
+				throw new IllegalArgumentException("Unsupported value type ID: " + valueID);
+			}
+			final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+			final int valueLength = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			final byte[] valueContent = new byte[valueLength];
+			System.arraycopy(bytes, offset, valueContent, 0, valueLength);
+			offset += valueLength;
+			final Object value = valueSerDer.deserialize(valueContent);
+			
+			map.put(key, value);
+		}
+		return map;
+	}
+
+	/**
+	 *  size + key1 type ID + key1 length + key1 binary content + value1 type id + value length + value1 binary content + ...
+	 *   4B        4B             4B             key1 bytes            4B              4B              value1 bytes
+	 */
+	@SuppressWarnings({ "unchecked" })
+	@Override
+	public byte[] serialize(Map map) {
+		if(map == null)
+			return null;
+		final int size = map.size();
+		final int[] keyIDs = new int[size];
+		final int[] valueIDs = new int[size];
+		final byte[][] keyBytes = new byte[size][];
+		final byte[][] valueBytes = new byte[size][];
+		
+		int totalSize = 4 + size * 16;
+		int i = 0;
+		Iterator iter = map.entrySet().iterator();
+		while (iter.hasNext()) {
+			final Map.Entry entry = (Map.Entry)iter.next();
+			final Object key = entry.getKey();
+			final Object value = entry.getValue();
+			Class<?> keyClass = key.getClass();
+			Class<?> valueClass = NullObject.class;
+			if (value != null) {
+				valueClass = value.getClass();
+			}
+			int keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass);
+			int valueTypeID = 0; // default null object
+			if (valueClass != null) {
+				valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+			}
+			if (keyTypeID == -1) {
+				if (key instanceof Map) {
+					keyClass = Map.class;
+					keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass);
+				} else {
+					throw new IllegalArgumentException("Unsupported class: " + keyClass.getName());
+				}
+			}
+			if (valueTypeID == -1) {
+				if (value instanceof Map) {
+					valueClass = Map.class;
+					valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass);
+				} else {
+					throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+				}
+			}
+			keyIDs[i] = keyTypeID;
+			valueIDs[i] = valueTypeID;
+			final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass);
+			final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass);
+			if (keySerDer == null) {
+				throw new IllegalArgumentException("Unsupported class: " + keyClass.getName());
+			}
+			if (valueSerDer == null) {
+				throw new IllegalArgumentException("Unsupported class: " + valueClass.getName());
+			}
+			keyBytes[i] = keySerDer.serialize(key);
+			valueBytes[i] = valueSerDer.serialize(value);
+			totalSize += keyBytes[i].length + valueBytes[i].length;
+			++i;
+		}
+		final byte[] result = new byte[totalSize];
+		int offset = 0;
+		ByteUtil.intToBytes(size, result, offset);
+		offset += 4;
+		for (i = 0; i < size; ++i) {
+			ByteUtil.intToBytes(keyIDs[i], result, offset);
+			offset += 4;
+			ByteUtil.intToBytes(keyBytes[i].length, result, offset);
+			offset += 4;
+			System.arraycopy(keyBytes[i], 0, result, offset, keyBytes[i].length);
+			offset += keyBytes[i].length;
+			
+			ByteUtil.intToBytes(valueIDs[i], result, offset);
+			offset += 4;
+			ByteUtil.intToBytes(valueBytes[i].length, result, offset);
+			offset += 4;
+			System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length);
+			offset += valueBytes[i].length;
+		}
+		return result;
+	}
+
+	@Override
+	public Class<Map> type() {
+		return Map.class;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
new file mode 100644
index 0000000..0e3e776
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Metric {
+	// interval with million seconds
+	long interval() default 60000;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
new file mode 100755
index 0000000..06bbed3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class MetricDefinition implements Writable {
+	private final static Logger LOG = LoggerFactory.getLogger(MetricDefinition.class);
+	private long interval;
+	private Class<?> singleTimestampEntityClass;
+	public long getInterval() {
+		return interval;
+	}
+	public void setInterval(long interval) {
+		this.interval = interval;
+	}
+	public Class<?> getSingleTimestampEntityClass() {
+		return singleTimestampEntityClass;
+	}
+	public void setSingleTimestampEntityClass(Class<?> singleTimestampEntityClass) {
+		this.singleTimestampEntityClass = singleTimestampEntityClass;
+	}
+
+	private final static String EMPTY="";
+	@Override
+	public void write(DataOutput out) throws IOException {
+		if(LOG.isDebugEnabled()) LOG.debug("Writing metric definition: interval = "+interval+" singleTimestampEntityClass = "+ this.singleTimestampEntityClass);
+		out.writeLong(interval);
+		if(this.singleTimestampEntityClass == null){
+			out.writeUTF(EMPTY);
+		}else {
+			out.writeUTF(this.singleTimestampEntityClass.getName());
+		}
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		interval = in.readLong();
+		String singleTimestampEntityClassName = in.readUTF();
+		if(!EMPTY.equals(singleTimestampEntityClassName)) {
+			try {
+				this.singleTimestampEntityClass = Class.forName(singleTimestampEntityClassName);
+			} catch (ClassNotFoundException e) {
+				if(LOG.isDebugEnabled()) LOG.warn("Class " + singleTimestampEntityClassName + " not found ");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
new file mode 100755
index 0000000..9fb05a3
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface NonUniqueIndex {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
new file mode 100755
index 0000000..ff11397
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface NonUniqueIndexes {
+	
+	public NonUniqueIndex[] value();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java
new file mode 100644
index 0000000..1b99fcd
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullObject.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+public class NullObject {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
new file mode 100755
index 0000000..1778788
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+public class NullSerDeser implements EntitySerDeser<NullObject>{
+
+	private static final byte[] EMPTY_NULL_ARRAY = new byte[0];
+	
+	@Override
+	public NullObject deserialize(byte[] bytes) {
+		return null;
+	}
+
+	@Override
+	public byte[] serialize(NullObject t) {
+		return EMPTY_NULL_ARRAY;
+	}
+
+	@Override
+	public Class<NullObject> type() {
+		return NullObject.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
new file mode 100644
index 0000000..cb60016
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Partition annotation will impact the rowkey generation for Eagle entities. Once an entity class 
+ * has defined the partition fields for an Eagle entity, the hash codes of the defined partition 
+ * fields will be placed just after prefix field, and before timestamp field.
+ * 
+ *
+ */
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Partition
+{
+    /**
+     * Order in which annotated tags are to be regarded as data partitions.
+     */
+    public String[] value() default { };
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
new file mode 100644
index 0000000..36f404c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Prefix {
+	String value() default "";
+}
\ No newline at end of file