You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ju...@apache.org on 2014/08/10 09:21:16 UTC

svn commit: r1617055 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ hadoop-yarn/hadoop-yarn-server/had...

Author: junping_du
Date: Sun Aug 10 07:21:15 2014
New Revision: 1617055

URL: http://svn.apache.org/r1617055
Log:
YARN-2302. Refactor TimelineWebServices. (Contributed by Zhijie Shen)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1617055&r1=1617054&r2=1617055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sun Aug 10 07:21:15 2014
@@ -105,6 +105,8 @@ Release 2.6.0 - UNRELEASED
 
     YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
 
+    YARN-2302. Refactor TimelineWebServices. (Zhijie Shen via junping_du)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1617055&r1=1617054&r2=1617055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Sun Aug 10 07:21:15 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
@@ -59,12 +60,12 @@ public class ApplicationHistoryServer ex
   private static final Log LOG = LogFactory
     .getLog(ApplicationHistoryServer.class);
 
-  protected ApplicationHistoryClientService ahsClientService;
-  protected ApplicationHistoryManager historyManager;
-  protected TimelineStore timelineStore;
-  protected TimelineDelegationTokenSecretManagerService secretManagerService;
-  protected TimelineACLsManager timelineACLsManager;
-  protected WebApp webApp;
+  private ApplicationHistoryClientService ahsClientService;
+  private ApplicationHistoryManager historyManager;
+  private TimelineStore timelineStore;
+  private TimelineDelegationTokenSecretManagerService secretManagerService;
+  private TimelineDataManager timelineDataManager;
+  private WebApp webApp;
 
   public ApplicationHistoryServer() {
     super(ApplicationHistoryServer.class.getName());
@@ -72,15 +73,18 @@ public class ApplicationHistoryServer ex
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    historyManager = createApplicationHistory();
-    ahsClientService = createApplicationHistoryClientService(historyManager);
-    addService(ahsClientService);
-    addService((Service) historyManager);
+    // init timeline services first
     timelineStore = createTimelineStore(conf);
     addIfService(timelineStore);
     secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
     addService(secretManagerService);
-    timelineACLsManager = createTimelineACLsManager(conf);
+    timelineDataManager = createTimelineDataManager(conf);
+
+    // init generic history service afterwards
+    historyManager = createApplicationHistoryManager(conf);
+    ahsClientService = createApplicationHistoryClientService(historyManager);
+    addService(ahsClientService);
+    addService((Service) historyManager);
 
     DefaultMetricsSystem.initialize("ApplicationHistoryServer");
     JvmMetrics.initSingleton("ApplicationHistoryServer", null);
@@ -111,21 +115,22 @@ public class ApplicationHistoryServer ex
 
   @Private
   @VisibleForTesting
-  public ApplicationHistoryClientService getClientService() {
+  ApplicationHistoryClientService getClientService() {
     return this.ahsClientService;
   }
 
-  protected ApplicationHistoryClientService
-      createApplicationHistoryClientService(
-          ApplicationHistoryManager historyManager) {
-    return new ApplicationHistoryClientService(historyManager);
-  }
-
-  protected ApplicationHistoryManager createApplicationHistory() {
-    return new ApplicationHistoryManagerImpl();
+  /**
+   * @return ApplicationTimelineStore
+   */
+  @Private
+  @VisibleForTesting
+  public TimelineStore getTimelineStore() {
+    return timelineStore;
   }
 
-  protected ApplicationHistoryManager getApplicationHistory() {
+  @Private
+  @VisibleForTesting
+  ApplicationHistoryManager getApplicationHistoryManager() {
     return this.historyManager;
   }
 
@@ -154,28 +159,35 @@ public class ApplicationHistoryServer ex
     launchAppHistoryServer(args);
   }
 
-  protected ApplicationHistoryManager createApplicationHistoryManager(
+  private ApplicationHistoryClientService
+      createApplicationHistoryClientService(
+          ApplicationHistoryManager historyManager) {
+    return new ApplicationHistoryClientService(historyManager);
+  }
+
+  private ApplicationHistoryManager createApplicationHistoryManager(
       Configuration conf) {
     return new ApplicationHistoryManagerImpl();
   }
 
-  protected TimelineStore createTimelineStore(
+  private TimelineStore createTimelineStore(
       Configuration conf) {
     return ReflectionUtils.newInstance(conf.getClass(
         YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
         TimelineStore.class), conf);
   }
 
-  protected TimelineDelegationTokenSecretManagerService
+  private TimelineDelegationTokenSecretManagerService
       createTimelineDelegationTokenSecretManagerService(Configuration conf) {
     return new TimelineDelegationTokenSecretManagerService();
   }
 
-  protected TimelineACLsManager createTimelineACLsManager(Configuration conf) {
-    return new TimelineACLsManager(conf);
+  private TimelineDataManager createTimelineDataManager(Configuration conf) {
+    return new TimelineDataManager(
+        timelineStore, new TimelineACLsManager(conf));
   }
 
-  protected void startWebApp() {
+  private void startWebApp() {
     Configuration conf = getConfig();
     // Always load pseudo authentication filter to parse "user.name" in an URL
     // to identify a HTTP request's user in insecure mode.
@@ -199,9 +211,8 @@ public class ApplicationHistoryServer ex
     try {
       AHSWebApp ahsWebApp = AHSWebApp.getInstance();
       ahsWebApp.setApplicationHistoryManager(historyManager);
-      ahsWebApp.setTimelineStore(timelineStore);
       ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
-      ahsWebApp.setTimelineACLsManager(timelineACLsManager);
+      ahsWebApp.setTimelineDataManager(timelineDataManager);
       webApp =
           WebApps
             .$for("applicationhistory", ApplicationHistoryClientService.class,
@@ -213,14 +224,6 @@ public class ApplicationHistoryServer ex
       throw new YarnRuntimeException(msg, e);
     }
   }
-  /**
-   * @return ApplicationTimelineStore
-   */
-  @Private
-  @VisibleForTesting
-  public TimelineStore getTimelineStore() {
-    return timelineStore;
-  }
 
   private void doSecureLogin(Configuration conf) throws IOException {
     InetSocketAddress socAddr = getBindAddress(conf);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1617055&r1=1617054&r2=1617055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Sun Aug 10 07:21:15 2014
@@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.uti
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.server.api.ApplicationContext;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
 import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -36,9 +35,8 @@ import com.google.common.annotations.Vis
 public class AHSWebApp extends WebApp implements YarnWebParams {
 
   private ApplicationHistoryManager applicationHistoryManager;
-  private TimelineStore timelineStore;
   private TimelineDelegationTokenSecretManagerService secretManagerService;
-  private TimelineACLsManager timelineACLsManager;
+  private TimelineDataManager timelineDataManager;
 
   private static AHSWebApp instance = null;
 
@@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp im
     this.applicationHistoryManager = applicationHistoryManager;
   }
 
-  public TimelineStore getTimelineStore() {
-    return timelineStore;
-  }
-
-  public void setTimelineStore(TimelineStore timelineStore) {
-    this.timelineStore = timelineStore;
-  }
-
   public TimelineDelegationTokenSecretManagerService
       getTimelineDelegationTokenSecretManagerService() {
     return secretManagerService;
@@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp im
     this.secretManagerService = secretManagerService;
   }
 
-  public TimelineACLsManager getTimelineACLsManager() {
-    return timelineACLsManager;
+  public TimelineDataManager getTimelineDataManager() {
+    return timelineDataManager;
   }
 
-  public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) {
-    this.timelineACLsManager = timelineACLsManager;
+  public void setTimelineDataManager(TimelineDataManager timelineDataManager) {
+    this.timelineDataManager = timelineDataManager;
   }
 
   @Override
@@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp im
     bind(TimelineWebServices.class);
     bind(GenericExceptionHandler.class);
     bind(ApplicationContext.class).toInstance(applicationHistoryManager);
-    bind(TimelineStore.class).toInstance(timelineStore);
     bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
         secretManagerService);
-    bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+    bind(TimelineDataManager.class).toInstance(timelineDataManager);
     route("/", AHSController.class);
     route(pajoin("/apps", APP_STATE), AHSController.class);
     route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java?rev=1617055&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java Sun Aug 10 07:21:15 2014
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * The class wrap over the timeline store and the ACLs manager. It does some non
+ * trivial manipulation of the timeline data before putting or after getting it
+ * from the timeline store, and checks the user's access to it.
+ * 
+ */
+public class TimelineDataManager {
+
+  private static final Log LOG = LogFactory.getLog(TimelineDataManager.class);
+
+  private TimelineStore store;
+  private TimelineACLsManager timelineACLsManager;
+
+  public TimelineDataManager(TimelineStore store,
+      TimelineACLsManager timelineACLsManager) {
+    this.store = store;
+    this.timelineACLsManager = timelineACLsManager;
+  }
+
+  /**
+   * Get the timeline entities that the given user have access to. The meaning
+   * of each argument has been documented with
+   * {@link TimelineReader#getEntities}.
+   * 
+   * @see TimelineReader#getEntities
+   */
+  public TimelineEntities getEntities(
+      String entityType,
+      NameValuePair primaryFilter,
+      Collection<NameValuePair> secondaryFilter,
+      Long windowStart,
+      Long windowEnd,
+      String fromId,
+      Long fromTs,
+      Long limit,
+      EnumSet<Field> fields,
+      UserGroupInformation callerUGI) throws YarnException, IOException {
+    TimelineEntities entities = null;
+    boolean modified = extendFields(fields);
+    entities = store.getEntities(
+        entityType,
+        limit,
+        windowStart,
+        windowEnd,
+        fromId,
+        fromTs,
+        primaryFilter,
+        secondaryFilter,
+        fields);
+    if (entities != null) {
+      Iterator<TimelineEntity> entitiesItr =
+          entities.getEntities().iterator();
+      while (entitiesItr.hasNext()) {
+        TimelineEntity entity = entitiesItr.next();
+        try {
+          // check ACLs
+          if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+            entitiesItr.remove();
+          } else {
+            // clean up system data
+            if (modified) {
+              entity.setPrimaryFilters(null);
+            } else {
+              cleanupOwnerInfo(entity);
+            }
+          }
+        } catch (YarnException e) {
+          LOG.error("Error when verifying access for user " + callerUGI
+              + " on the events of the timeline entity "
+              + new EntityIdentifier(entity.getEntityId(),
+                  entity.getEntityType()), e);
+          entitiesItr.remove();
+        }
+      }
+    }
+    if (entities == null) {
+      return new TimelineEntities();
+    }
+    return entities;
+  }
+
+  /**
+   * Get the single timeline entity that the given user has access to. The
+   * meaning of each argument has been documented with
+   * {@link TimelineReader#getEntity}.
+   * 
+   * @see TimelineReader#getEntity
+   */
+  public TimelineEntity getEntity(
+      String entityType,
+      String entityId,
+      EnumSet<Field> fields,
+      UserGroupInformation callerUGI) throws YarnException, IOException {
+    TimelineEntity entity = null;
+    boolean modified = extendFields(fields);
+    entity =
+        store.getEntity(entityId, entityType, fields);
+    if (entity != null) {
+      // check ACLs
+      if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+        entity = null;
+      } else {
+        // clean up the system data
+        if (modified) {
+          entity.setPrimaryFilters(null);
+        } else {
+          cleanupOwnerInfo(entity);
+        }
+      }
+    }
+    return entity;
+  }
+
+  /**
+   * Get the events whose entities the given user has access to. The meaning of
+   * each argument has been documented with
+   * {@link TimelineReader#getEntityTimelines}.
+   * 
+   * @see TimelineReader#getEntityTimelines
+   */
+  public TimelineEvents getEvents(
+      String entityType,
+      SortedSet<String> entityIds,
+      SortedSet<String> eventTypes,
+      Long windowStart,
+      Long windowEnd,
+      Long limit,
+      UserGroupInformation callerUGI) throws YarnException, IOException {
+    TimelineEvents events = null;
+    events = store.getEntityTimelines(
+        entityType,
+        entityIds,
+        limit,
+        windowStart,
+        windowEnd,
+        eventTypes);
+    if (events != null) {
+      Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
+          events.getAllEvents().iterator();
+      while (eventsItr.hasNext()) {
+        TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
+        try {
+          TimelineEntity entity = store.getEntity(
+              eventsOfOneEntity.getEntityId(),
+              eventsOfOneEntity.getEntityType(),
+              EnumSet.of(Field.PRIMARY_FILTERS));
+          // check ACLs
+          if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
+            eventsItr.remove();
+          }
+        } catch (Exception e) {
+          LOG.error("Error when verifying access for user " + callerUGI
+              + " on the events of the timeline entity "
+              + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
+                  eventsOfOneEntity.getEntityType()), e);
+          eventsItr.remove();
+        }
+      }
+    }
+    if (events == null) {
+      return new TimelineEvents();
+    }
+    return events;
+  }
+
+  /**
+   * Store the timeline entities into the store and set the owner of them to the
+   * given user.
+   */
+  public TimelinePutResponse postEntities(
+      TimelineEntities entities,
+      UserGroupInformation callerUGI) throws YarnException, IOException {
+    if (entities == null) {
+      return new TimelinePutResponse();
+    }
+    List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
+    TimelineEntities entitiesToPut = new TimelineEntities();
+    List<TimelinePutResponse.TimelinePutError> errors =
+        new ArrayList<TimelinePutResponse.TimelinePutError>();
+    for (TimelineEntity entity : entities.getEntities()) {
+      EntityIdentifier entityID =
+          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+
+      // check if there is existing entity
+      TimelineEntity existingEntity = null;
+      try {
+        existingEntity =
+            store.getEntity(entityID.getId(), entityID.getType(),
+                EnumSet.of(Field.PRIMARY_FILTERS));
+        if (existingEntity != null
+            && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
+          throw new YarnException("The timeline entity " + entityID
+              + " was not put by " + callerUGI + " before");
+        }
+      } catch (Exception e) {
+        // Skip the entity which already exists and was put by others
+        LOG.error("Skip the timeline entity: " + entityID + ", because "
+            + e.getMessage());
+        TimelinePutResponse.TimelinePutError error =
+            new TimelinePutResponse.TimelinePutError();
+        error.setEntityId(entityID.getId());
+        error.setEntityType(entityID.getType());
+        error.setErrorCode(
+            TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
+        errors.add(error);
+        continue;
+      }
+
+      // inject owner information for the access check if this is the first
+      // time to post the entity, in case it's the admin who is updating
+      // the timeline data.
+      try {
+        if (existingEntity == null) {
+          injectOwnerInfo(entity, callerUGI.getShortUserName());
+        }
+      } catch (YarnException e) {
+        // Skip the entity which messes up the primary filter and record the
+        // error
+        LOG.error("Skip the timeline entity: " + entityID + ", because "
+            + e.getMessage());
+        TimelinePutResponse.TimelinePutError error =
+            new TimelinePutResponse.TimelinePutError();
+        error.setEntityId(entityID.getId());
+        error.setEntityType(entityID.getType());
+        error.setErrorCode(
+            TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
+        errors.add(error);
+        continue;
+      }
+
+      entityIDs.add(entityID);
+      entitiesToPut.addEntity(entity);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
+    }
+    TimelinePutResponse response = store.put(entitiesToPut);
+    // add the errors of timeline system filter key conflict
+    response.addErrors(errors);
+    return response;
+  }
+
+  private static boolean extendFields(EnumSet<Field> fieldEnums) {
+    boolean modified = false;
+    if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
+      fieldEnums.add(Field.PRIMARY_FILTERS);
+      modified = true;
+    }
+    return modified;
+  }
+
+  private static void injectOwnerInfo(TimelineEntity timelineEntity,
+      String owner) throws YarnException {
+    if (timelineEntity.getPrimaryFilters() != null &&
+        timelineEntity.getPrimaryFilters().containsKey(
+            TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
+      throw new YarnException(
+          "User should not use the timeline system filter key: "
+              + TimelineStore.SystemFilter.ENTITY_OWNER);
+    }
+    timelineEntity.addPrimaryFilter(
+        TimelineStore.SystemFilter.ENTITY_OWNER
+            .toString(), owner);
+  }
+
+  private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
+    if (timelineEntity.getPrimaryFilters() != null) {
+      timelineEntity.getPrimaryFilters().remove(
+          TimelineStore.SystemFilter.ENTITY_OWNER.toString());
+    }
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1617055&r1=1617054&r2=1617055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Sun Aug 10 07:21:15 2014
@@ -18,14 +18,10 @@
 
 package org.apache.hadoop.yarn.server.timeline.webapp;
 
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -80,14 +73,11 @@ public class TimelineWebServices {
 
   private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
 
-  private TimelineStore store;
-  private TimelineACLsManager timelineACLsManager;
+  private TimelineDataManager timelineDataManager;
 
   @Inject
-  public TimelineWebServices(TimelineStore store,
-      TimelineACLsManager timelineACLsManager) {
-    this.store = store;
-    this.timelineACLsManager = timelineACLsManager;
+  public TimelineWebServices(TimelineDataManager timelineDataManager) {
+    this.timelineDataManager = timelineDataManager;
   }
 
   @XmlRootElement(name = "about")
@@ -148,61 +138,28 @@ public class TimelineWebServices {
       @QueryParam("limit") String limit,
       @QueryParam("fields") String fields) {
     init(res);
-    TimelineEntities entities = null;
     try {
-      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
-      boolean modified = extendFields(fieldEnums);
-      UserGroupInformation callerUGI = getUser(req);
-      entities = store.getEntities(
+      return timelineDataManager.getEntities(
           parseStr(entityType),
-          parseLongStr(limit),
+          parsePairStr(primaryFilter, ":"),
+          parsePairsStr(secondaryFilter, ",", ":"),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
           parseStr(fromId),
           parseLongStr(fromTs),
-          parsePairStr(primaryFilter, ":"),
-          parsePairsStr(secondaryFilter, ",", ":"),
-          fieldEnums);
-      if (entities != null) {
-        Iterator<TimelineEntity> entitiesItr =
-            entities.getEntities().iterator();
-        while (entitiesItr.hasNext()) {
-          TimelineEntity entity = entitiesItr.next();
-          try {
-            // check ACLs
-            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-              entitiesItr.remove();
-            } else {
-              // clean up system data
-              if (modified) {
-                entity.setPrimaryFilters(null);
-              } else {
-                cleanupOwnerInfo(entity);
-              }
-            }
-          } catch (YarnException e) {
-            LOG.error("Error when verifying access for user " + callerUGI
-                + " on the events of the timeline entity "
-                + new EntityIdentifier(entity.getEntityId(),
-                    entity.getEntityType()), e);
-            entitiesItr.remove();
-          }
-        }
-      }
+          parseLongStr(limit),
+          parseFieldsStr(fields, ","),
+          getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
           "windowStart, windowEnd or limit is not a numeric value.");
     } catch (IllegalArgumentException e) {
       throw new BadRequestException("requested invalid field.");
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entities", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
     }
-    if (entities == null) {
-      return new TimelineEntities();
-    }
-    return entities;
   }
 
   /**
@@ -220,33 +177,15 @@ public class TimelineWebServices {
     init(res);
     TimelineEntity entity = null;
     try {
-      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
-      boolean modified = extendFields(fieldEnums);
-      entity =
-          store.getEntity(parseStr(entityId), parseStr(entityType),
-              fieldEnums);
-      if (entity != null) {
-        // check ACLs
-        UserGroupInformation callerUGI = getUser(req);
-        if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-          entity = null;
-        } else {
-          // clean up the system data
-          if (modified) {
-            entity.setPrimaryFilters(null);
-          } else {
-            cleanupOwnerInfo(entity);
-          }
-        }
-      }
+      entity = timelineDataManager.getEntity(
+          parseStr(entityType),
+          parseStr(entityId),
+          parseFieldsStr(fields, ","),
+          getUser(req));
     } catch (IllegalArgumentException e) {
       throw new BadRequestException(
           "requested invalid field.");
-    } catch (IOException e) {
-      LOG.error("Error getting entity", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    } catch (YarnException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entity", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -275,51 +214,23 @@ public class TimelineWebServices {
       @QueryParam("windowEnd") String windowEnd,
       @QueryParam("limit") String limit) {
     init(res);
-    TimelineEvents events = null;
     try {
-      UserGroupInformation callerUGI = getUser(req);
-      events = store.getEntityTimelines(
+      return timelineDataManager.getEvents(
           parseStr(entityType),
           parseArrayStr(entityId, ","),
-          parseLongStr(limit),
+          parseArrayStr(eventType, ","),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
-          parseArrayStr(eventType, ","));
-      if (events != null) {
-        Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
-            events.getAllEvents().iterator();
-        while (eventsItr.hasNext()) {
-          TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
-          try {
-            TimelineEntity entity = store.getEntity(
-                eventsOfOneEntity.getEntityId(),
-                eventsOfOneEntity.getEntityType(),
-                EnumSet.of(Field.PRIMARY_FILTERS));
-            // check ACLs
-            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-              eventsItr.remove();
-            }
-          } catch (Exception e) {
-            LOG.error("Error when verifying access for user " + callerUGI
-                + " on the events of the timeline entity "
-                + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
-                    eventsOfOneEntity.getEntityType()), e);
-            eventsItr.remove();
-          }
-        }
-      }
+          parseLongStr(limit),
+          getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
           "windowStart, windowEnd or limit is not a numeric value.");
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entity timelines", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
     }
-    if (events == null) {
-      return new TimelineEvents();
-    }
-    return events;
   }
 
   /**
@@ -333,9 +244,6 @@ public class TimelineWebServices {
       @Context HttpServletResponse res,
       TimelineEntities entities) {
     init(res);
-    if (entities == null) {
-      return new TimelinePutResponse();
-    }
     UserGroupInformation callerUGI = getUser(req);
     if (callerUGI == null) {
       String msg = "The owner of the posted timeline entities is not set";
@@ -343,76 +251,8 @@ public class TimelineWebServices {
       throw new ForbiddenException(msg);
     }
     try {
-      List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
-      TimelineEntities entitiesToPut = new TimelineEntities();
-      List<TimelinePutResponse.TimelinePutError> errors =
-          new ArrayList<TimelinePutResponse.TimelinePutError>();
-      for (TimelineEntity entity : entities.getEntities()) {
-        EntityIdentifier entityID =
-            new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
-
-        // check if there is existing entity
-        TimelineEntity existingEntity = null;
-        try {
-          existingEntity =
-              store.getEntity(entityID.getId(), entityID.getType(),
-                  EnumSet.of(Field.PRIMARY_FILTERS));
-          if (existingEntity != null
-              && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
-            throw new YarnException("The timeline entity " + entityID
-                + " was not put by " + callerUGI + " before");
-          }
-        } catch (Exception e) {
-          // Skip the entity which already exists and was put by others
-          LOG.warn("Skip the timeline entity: " + entityID + ", because "
-              + e.getMessage());
-          TimelinePutResponse.TimelinePutError error =
-              new TimelinePutResponse.TimelinePutError();
-          error.setEntityId(entityID.getId());
-          error.setEntityType(entityID.getType());
-          error.setErrorCode(
-              TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
-          errors.add(error);
-          continue;
-        }
-
-        // inject owner information for the access check if this is the first
-        // time to post the entity, in case it's the admin who is updating
-        // the timeline data.
-        try {
-          if (existingEntity == null) {
-            injectOwnerInfo(entity, callerUGI.getShortUserName());
-          }
-        } catch (YarnException e) {
-          // Skip the entity which messes up the primary filter and record the
-          // error
-          LOG.warn("Skip the timeline entity: " + entityID + ", because "
-              + e.getMessage());
-          TimelinePutResponse.TimelinePutError error =
-              new TimelinePutResponse.TimelinePutError();
-          error.setEntityId(entityID.getId());
-          error.setEntityType(entityID.getType());
-          error.setErrorCode(
-              TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
-          errors.add(error);
-          continue;
-        }
-
-        entityIDs.add(entityID);
-        entitiesToPut.addEntity(entity);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
-              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
-      }
-      TimelinePutResponse response =  store.put(entitiesToPut);
-      // add the errors of timeline system filter key conflict
-      response.addErrors(errors);
-      return response;
-    } catch (IOException e) {
+      return timelineDataManager.postEntities(entities, callerUGI);
+    } catch (Exception e) {
       LOG.error("Error putting entities", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -423,6 +263,15 @@ public class TimelineWebServices {
     response.setContentType(null);
   }
 
+  private static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
   private static SortedSet<String> parseArrayStr(String str, String delimiter) {
     if (str == null) {
       return null;
@@ -495,14 +344,6 @@ public class TimelineWebServices {
     }
   }
 
-  private static boolean extendFields(EnumSet<Field> fieldEnums) {
-    boolean modified = false;
-    if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
-      fieldEnums.add(Field.PRIMARY_FILTERS);
-      modified = true;
-    }
-    return modified;
-  }
   private static Long parseLongStr(String str) {
     return str == null ? null : Long.parseLong(str.trim());
   }
@@ -511,34 +352,4 @@ public class TimelineWebServices {
     return str == null ? null : str.trim();
   }
 
-  private static UserGroupInformation getUser(HttpServletRequest req) {
-    String remoteUser = req.getRemoteUser();
-    UserGroupInformation callerUGI = null;
-    if (remoteUser != null) {
-      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-    }
-    return callerUGI;
-  }
-
-  private static void injectOwnerInfo(TimelineEntity timelineEntity,
-      String owner) throws YarnException {
-    if (timelineEntity.getPrimaryFilters() != null &&
-        timelineEntity.getPrimaryFilters().containsKey(
-            TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
-      throw new YarnException(
-          "User should not use the timeline system filter key: "
-              + TimelineStore.SystemFilter.ENTITY_OWNER);
-    }
-    timelineEntity.addPrimaryFilter(
-        TimelineStore.SystemFilter.ENTITY_OWNER
-            .toString(), owner);
-  }
-
-  private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
-    if (timelineEntity.getPrimaryFilters() != null) {
-      timelineEntity.getPrimaryFilters().remove(
-          TimelineStore.SystemFilter.ENTITY_OWNER.toString());
-    }
-  }
-
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1617055&r1=1617054&r2=1617055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Sun Aug 10 07:21:15 2014
@@ -69,7 +69,7 @@ public class TestApplicationHistoryClien
     historyServer.init(config);
     historyServer.start();
     store =
-        ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory())
+        ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager())
           .getHistoryStore();
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1617055&r1=1617054&r2=1617055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Sun Aug 10 07:21:15 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AdminACLsManager;
 import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
@@ -89,14 +90,15 @@ public class TestTimelineWebServices ext
       } catch (Exception e) {
         Assert.fail();
       }
-      bind(TimelineStore.class).toInstance(store);
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
       timelineACLsManager = new TimelineACLsManager(conf);
       conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
       conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
       adminACLsManager = new AdminACLsManager(conf);
-      bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+      TimelineDataManager timelineDataManager =
+          new TimelineDataManager(store, timelineACLsManager);
+      bind(TimelineDataManager.class).toInstance(timelineDataManager);
       serve("/*").with(GuiceContainer.class);
       TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
       FilterConfig filterConfig = mock(FilterConfig.class);