You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/11 22:19:41 UTC

[7/8] flink git commit: [FLINK-1969] [runtime] Remove deprecated profiler code

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
deleted file mode 100644
index e07f144..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.types.StringValue;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A single instance profiling event encapsulates profiling information for one particular instance.
- */
-public final class SingleInstanceProfilingEvent extends InstanceProfilingEvent {
-
-	private static final long serialVersionUID = 1L;
-	
-	private String instanceName;
-
-	/**
-	 * Constructs a new instance profiling event.
-	 * 
-	 * @param profilingInterval
-	 *        the interval of time this profiling event covers in milliseconds
-	 * @param ioWaitCPU
-	 *        the percentage of time the CPU(s) spent in state IOWAIT during the profiling interval
-	 * @param idleCPU
-	 *        the percentage of time the CPU(s) spent in state IDLE during the profiling interval
-	 * @param userCPU
-	 *        the percentage of time the CPU(s) spent in state USER during the profiling interval
-	 * @param systemCPU
-	 *        the percentage of time the CPU(s) spent in state SYSTEM during the profiling interval
-	 * @param hardIrqCPU
-	 *        the percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval
-	 * @param softIrqCPU
-	 *        the percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval
-	 * @param totalMemory
-	 *        the total amount of this instance's main memory in bytes
-	 * @param freeMemory
-	 *        the free amount of this instance's main memory in bytes
-	 * @param bufferedMemory
-	 *        the amount of main memory the instance uses for file buffers
-	 * @param cachedMemory
-	 *        the amount of main memory the instance uses as cache memory
-	 * @param cachedSwapMemory
-	 *        The amount of main memory the instance uses for cached swaps
-	 * @param receivedBytes
-	 *        the number of bytes received via network during the profiling interval
-	 * @param transmittedBytes
-	 *        the number of bytes transmitted via network during the profiling interval
-	 * @param jobID
-	 *        the ID of this job this profiling event belongs to
-	 * @param timestamp
-	 *        the time stamp of this profiling event's creation
-	 * @param profilingTimestamp
-	 *        the time stamp relative to the beginning of the job's execution
-	 * @param instanceName
-	 *        the name of the instance this profiling event refers to
-	 */
-	public SingleInstanceProfilingEvent(final int profilingInterval, final int ioWaitCPU, final int idleCPU,
-			final int userCPU, final int systemCPU, final int hardIrqCPU, final int softIrqCPU, final long totalMemory,
-			final long freeMemory, final long bufferedMemory, final long cachedMemory, final long cachedSwapMemory,
-			final long receivedBytes, final long transmittedBytes, final JobID jobID, final long timestamp,
-			final long profilingTimestamp, final String instanceName)
-	{
-		super(profilingInterval, ioWaitCPU, idleCPU, userCPU, systemCPU, hardIrqCPU, softIrqCPU, totalMemory,
-			freeMemory, bufferedMemory, cachedMemory, cachedSwapMemory, receivedBytes, transmittedBytes, jobID,
-			timestamp, profilingTimestamp);
-
-		Preconditions.checkNotNull(instanceName);
-		this.instanceName = instanceName;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public SingleInstanceProfilingEvent() {
-		super();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Returns the name of the instance.
-	 * 
-	 * @return the name of the instance
-	 */
-	public String getInstanceName() {
-		return this.instanceName;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Serialization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		super.read(in);
-		this.instanceName = StringValue.readString(in);
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-		StringValue.writeString(this.instanceName, out);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof SingleInstanceProfilingEvent) {
-			SingleInstanceProfilingEvent other = (SingleInstanceProfilingEvent) obj;
-			return super.equals(obj) && this.instanceName.equals(other.instanceName);
-			
-		}
-		else {
-			return false;
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return super.hashCode() + 31*instanceName.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
deleted file mode 100644
index 08b932a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * Through this interface it is possible to access profiling data about the CPU utilization
- * of the corresponding execution thread during its execution.
- */
-public class ThreadProfilingEvent extends VertexProfilingEvent {
-
-	private static final long serialVersionUID = -3006867830244444710L;
-
-	private int userTime;
-
-	private int systemTime;
-
-	private int blockedTime;
-
-	private int waitedTime;
-
-	public ThreadProfilingEvent(int userTime, int systemTime, int blockedTime, int waitedTime,
-			JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
-			int profilingInterval, JobID jobID, long timestamp, long profilingTimestamp)
-	{
-		super(vertexId, subtask, executionId, profilingInterval, jobID, timestamp, profilingTimestamp);
-
-		this.userTime = userTime;
-		this.systemTime = systemTime;
-		this.blockedTime = blockedTime;
-		this.waitedTime = waitedTime;
-	}
-
-	public ThreadProfilingEvent() {
-		super();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns the percentage of time the execution thread spent in
-	 * user mode in the given profiling interval.
-	 * 
-	 * @return the percentage of time spent in user mode
-	 */
-	public int getUserTime() {
-		return this.userTime;
-	}
-
-	/**
-	 * Returns the percentage of time the execution thread spent in
-	 * system mode in the given profiling interval.
-	 * 
-	 * @return the percentage of time spent in system mode
-	 */
-	public int getSystemTime() {
-		return this.systemTime;
-	}
-
-	/**
-	 * Returns the percentage of time the execution thread has been
-	 * blocked to enter or reenter a monitor in the given profiling interval.
-	 * 
-	 * @return the percentage of time the thread has been blocked
-	 */
-	public int getBlockedTime() {
-		return this.blockedTime;
-	}
-
-	/**
-	 * Returns the percentage of time the execution thread spent in
-	 * either <code>WAITING</code> or <code>TIMED_WAITING</code> state in the given profiling interval.
-	 * 
-	 * @return the percentage of time the thread spent waiting
-	 */
-	public int getWaitedTime() {
-		return this.waitedTime;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Serialization
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		super.read(in);
-
-		this.userTime = in.readInt();
-		this.systemTime = in.readInt();
-		this.blockedTime = in.readInt();
-		this.waitedTime = in.readInt();
-	}
-
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-
-		out.writeInt(this.userTime);
-		out.writeInt(this.systemTime);
-		out.writeInt(this.blockedTime);
-		out.writeInt(this.waitedTime);
-	}
-
-
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ThreadProfilingEvent) {
-			final ThreadProfilingEvent other = (ThreadProfilingEvent) obj;
-			
-			return this.userTime == other.userTime &&
-					this.systemTime == other.systemTime &&
-					this.blockedTime == other.blockedTime &&
-					this.waitedTime == other.waitedTime && 
-					super.equals(obj);
-		}
-		else {
-			return false;
-		}
-	}
-	
-	// hash code is inherited from the superclass
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
deleted file mode 100644
index fc18758..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * This interface is a base interface for profiling data which
- * pertains to the execution of tasks.
- */
-public abstract class VertexProfilingEvent extends ProfilingEvent {
-
-	private static final long serialVersionUID = -5364961557518174880L;
-
-	private final JobVertexID vertexId;
-	
-	private int subtask;
-	
-	private final ExecutionAttemptID executionId;
-
-	private int profilingInterval;
-
-	
-	public VertexProfilingEvent(JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
-			int profilingInterval, JobID jobID, long timestamp, long profilingTimestamp)
-	{
-		super(jobID, timestamp, profilingTimestamp);
-
-		this.vertexId = vertexId;
-		this.subtask = subtask;
-		this.executionId = executionId;
-		this.profilingInterval = profilingInterval;
-	}
-
-	public VertexProfilingEvent() {
-		super();
-		this.vertexId = new JobVertexID();
-		this.executionId = new ExecutionAttemptID();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns the ID of the vertex this profiling information
-	 * belongs to.
-	 * 
-	 * @return the ID of the vertex this profiling information belongs to
-	 */
-	public JobVertexID getVertexID() {
-		return this.vertexId;
-	}
-
-	/**
-	 * The interval in milliseconds to which the rest
-	 * of the profiling data relates to.
-	 * 
-	 * @return the profiling interval given in milliseconds
-	 */
-	public int getProfilingInterval() {
-		return this.profilingInterval;
-	}
-	
-	public int getSubtask() {
-		return subtask;
-	}
-	
-	public ExecutionAttemptID getExecutionId() {
-		return executionId;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Serialization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		super.read(in);
-
-		this.vertexId.read(in);
-		this.executionId.read(in);
-		this.subtask = in.readInt();
-		this.profilingInterval = in.readInt();
-	}
-
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-
-		this.vertexId.write(out);
-		this.executionId.write(out);
-		out.writeInt(subtask);
-		out.writeInt(this.profilingInterval);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof VertexProfilingEvent) {
-			final VertexProfilingEvent other = (VertexProfilingEvent) obj;
-			
-			return super.equals(other) && this.subtask == other.subtask &&
-					this.profilingInterval == other.profilingInterval &&
-					this.vertexId.equals(other.vertexId) &&
-					this.executionId.equals(other.executionId);
-		}
-		else {
-			return false;
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return super.hashCode() ^ vertexId.hashCode() ^ (31*subtask) ^ executionId.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 15a9446..75463fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -53,11 +53,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
-import org.slf4j.LoggerFactory
-
 import akka.actor._
 
 import scala.concurrent._
@@ -98,7 +95,6 @@ class JobManager(val flinkConfiguration: Configuration,
                  val libraryCacheManager: BlobLibraryCacheManager,
                  val archive: ActorRef,
                  val accumulatorManager: AccumulatorManager,
-                 val profiler: Option[ActorRef],
                  val defaultExecutionRetries: Int,
                  val delayBetweenRetries: Long,
                  val timeout: FiniteDuration)
@@ -124,7 +120,6 @@ class JobManager(val flinkConfiguration: Configuration,
     }
 
     archive ! PoisonPill
-    profiler.foreach( ref => ref ! PoisonPill )
 
     for((e,_) <- currentJobs.values) {
       e.fail(new Exception("The JobManager is shutting down."))
@@ -935,7 +930,7 @@ object JobManager {
 
   /**
    * Create the job manager components as (instanceManager, scheduler, libraryCacheManager,
-   *              archiverProps, accumulatorManager, profiler, defaultExecutionRetries,
+   *              archiverProps, accumulatorManager, defaultExecutionRetries,
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
@@ -943,15 +938,13 @@ object JobManager {
    */
   def createJobManagerComponents(configuration: Configuration) :
     (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
-      Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = {
+      Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
     val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
 
-    val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, false)
-
     val cleanupInterval = configuration.getLong(
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
@@ -978,12 +971,6 @@ object JobManager {
 
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 
-    val profilerProps: Option[Props] = if (profilingEnabled) {
-      Some(Props(classOf[JobManagerProfiler]))
-    } else {
-      None
-    }
-
     val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
 
     var blobServer: BlobServer = null
@@ -1018,7 +1005,7 @@ object JobManager {
     }
 
     (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
-      profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount)
+      executionRetries, delayBetweenRetries, timeout, archiveCount)
   }
 
   /**
@@ -1052,12 +1039,9 @@ object JobManager {
                             archiverActorName: Option[String]): (ActorRef, ActorRef) = {
 
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
-      profilerProps, executionRetries, delayBetweenRetries,
+      executionRetries, delayBetweenRetries,
       timeout, _) = createJobManagerComponents(configuration)
 
-    val profiler: Option[ActorRef] =
-                 profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME) )
-
     // start the archiver wither with the given name, or without (avoid name conflicts)
     val archiver: ActorRef = archiverActorName match {
       case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
@@ -1065,7 +1049,7 @@ object JobManager {
     }
 
     val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
-        libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+        libraryCacheManager, archiver, accumulatorManager, executionRetries,
         delayBetweenRetries, timeout)
 
     val jobManager: ActorRef = jobMangerActorName match {

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
deleted file mode 100644
index dd3a1b7..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import akka.actor.Actor
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
-import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
-import org.apache.flink.runtime.profiling.impl.types.{InternalInstanceProfilingData, InternalExecutionVertexThreadProfilingData}
-
-import scala.collection.convert.WrapAsScala
-
-/**
- * Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages.
- */
-class JobManagerProfiler
-  extends Actor
-  with ActorLogMessages
-  with ActorSynchronousLogging
-  with WrapAsScala {
-  override def receiveWithLogMessages: Receive = {
-    case ReportProfilingData(profilingContainer) =>
-      profilingContainer.getIterator foreach {
-        case x: InternalExecutionVertexThreadProfilingData =>
-          log.info(s"Received InternalExecutionVertexThreadProfilingData $x.")
-        case x: InternalInstanceProfilingData =>
-          log.info(s"Received InternalInstanceProfilingData $x.")
-        case x =>
-          log.error(s"Received unknown profiling data: ${x.getClass.getName}" )
-      }
-  }
-
-  /**
-   * Handle unmatched messages with an exception.
-   */
-  override def unhandled(message: Any): Unit = {
-    // let the actor crash
-    throw new RuntimeException("Received unknown message " + message)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala
deleted file mode 100644
index 85e0fc3..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
-
-/**
- * This object contains the job manager profiler messages
- */
-object JobManagerProfilerMessages {
-
-  /**
-   * Reports profiling data to the profiler.
-   * @param profilingDataContainer
-   */
-  case class ReportProfilingData(profilingDataContainer: ProfilingDataContainer)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala
deleted file mode 100644
index 0662d48..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-object TaskManagerProfilerMessages {
-
-  /**
-   * Requests to monitor the specified [[task]].
-   *
-   * @param task
-   */
-  case class MonitorTask(task: Task)
-
-  /**
-   * Requests to unmonitor the task associated to [[executionID]].
-   *
-   * @param executionID
-   */
-  case class UnmonitorTask(executionID: ExecutionAttemptID)
-
-  /**
-   * Registers the sender as a profiling event listener.
-   */
-  case object RegisterProfilingListener
-
-  /**
-   * Unregisters the sender as a profiling event listener.
-   */
-  case object UnregisterProfilingListener
-
-  /**
-   * Makes the task manager profiling the running tasks.
-   */
-  case object ProfileTasks
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
deleted file mode 100644
index f0079f8..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import java.lang.management.ManagementFactory
-import java.util.concurrent.TimeUnit
-
-import akka.actor.{Cancellable, ActorRef, Actor, ActorLogging}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
-import org.apache.flink.runtime.execution.{RuntimeEnvironment, ExecutionState}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.JobVertexID
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
-import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages._
-import org.apache.flink.runtime.profiling.ProfilingException
-import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
-import org.apache.flink.runtime.profiling.impl.{EnvironmentThreadSet, InstanceProfiler}
-
-import scala.concurrent.duration.FiniteDuration
-
-/**
- * Actor which is responsible for profiling task threads on the [[TaskManager]]. The monitoring
- * is triggered by the self-addressed message [[ProfileTasks]] which is scheduled to be sent
- * repeatedly.
- *
- * @param instancePath Akka URL to [[TaskManager]] instance
- * @param reportInterval Interval of profiling action
- */
-class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
-   extends Actor with ActorLogMessages with ActorSynchronousLogging {
-
-  import context.dispatcher
-
-  val tmx = ManagementFactory.getThreadMXBean
-  val instanceProfiler = new InstanceProfiler(instancePath)
-  val listeners = scala.collection.mutable.Set[ActorRef]()
-  val environments = scala.collection.mutable.HashMap[ExecutionAttemptID, RuntimeEnvironment]()
-  val monitoredThreads = scala.collection.mutable.HashMap[RuntimeEnvironment,
-    EnvironmentThreadSet]()
-
-  var monitoringScheduler: Option[Cancellable] = None
-
-  if (tmx.isThreadContentionMonitoringSupported) {
-    tmx.setThreadContentionMonitoringEnabled(true)
-  } else {
-    throw new ProfilingException("The thread contention monitoring is not supported.")
-  }
-
-
-  override def receiveWithLogMessages: Receive = {
-    case MonitorTask(task) =>
-      task.registerExecutionListener(self)
-      environments += task.getExecutionId -> task.getEnvironment
-
-    case UnmonitorTask(executionAttemptID) =>
-      environments.remove(executionAttemptID)
-
-    case RegisterProfilingListener =>
-      listeners += sender
-      if (monitoringScheduler.isEmpty) {
-        startMonitoring()
-      }
-
-    case UnregisterProfilingListener =>
-      listeners -= sender
-      if (listeners.isEmpty) {
-        stopMonitoring()
-      }
-
-    case ProfileTasks =>
-      val timestamp = System.currentTimeMillis()
-
-      val profilingDataContainer = new ProfilingDataContainer()
-
-      for ((env, set) <- monitoredThreads) {
-        val threadProfilingData = set.captureCPUUtilization(env.getJobID, tmx, timestamp)
-
-        if (threadProfilingData != null) {
-          profilingDataContainer.addProfilingData(threadProfilingData)
-        }
-
-        if (monitoredThreads.nonEmpty) {
-          val instanceProfilingData = try {
-            Some(instanceProfiler.generateProfilingData(timestamp))
-          } catch {
-            case e: ProfilingException =>
-              log.error("Error while retrieving instance profiling data.", e)
-              None
-          }
-
-          instanceProfilingData foreach {
-            profilingDataContainer.addProfilingData(_)
-          }
-
-          if (!profilingDataContainer.isEmpty) {
-            for (listener <- listeners) {
-              listener ! ReportProfilingData(profilingDataContainer)
-            }
-          }
-
-          profilingDataContainer.clear()
-        }
-      }
-
-    case ExecutionStateChanged(_, vertexID, _, _, subtaskIndex, executionID, newExecutionState,
-    _, _) =>
-      import ExecutionState._
-
-      environments.get(executionID) match {
-        case Some(environment) =>
-          newExecutionState match {
-            case RUNNING => registerMainThreadForCPUProfiling(environment, vertexID,
-              subtaskIndex, executionID)
-            case FINISHED | CANCELING | CANCELED | FAILED =>
-              unregisterMainThreadFromCPUProfiling(environment)
-            case _ =>
-          }
-        case None =>
-          log.warn(s"Could not find environment for execution id $executionID.")
-      }
-  }
-
-  /**
-   * Handle unmatched messages with an exception.
-   */
-  override def unhandled(message: Any): Unit = {
-    // let the actor crash
-    throw new RuntimeException("Received unknown message " + message)
-  }
-
-  def startMonitoring(): Unit = {
-    val interval = new FiniteDuration(reportInterval, TimeUnit.MILLISECONDS)
-    val delay = new FiniteDuration((reportInterval * Math.random()).toLong, TimeUnit.MILLISECONDS)
-
-    // schedule ProfileTasks message to be sent repeatedly to oneself
-    monitoringScheduler = Some(context.system.scheduler.schedule(delay, interval, self,
-      ProfileTasks))
-  }
-
-  def stopMonitoring(): Unit = {
-    monitoringScheduler.foreach {
-      _.cancel()
-    }
-    monitoringScheduler = None
-  }
-
-  def registerMainThreadForCPUProfiling(environment: RuntimeEnvironment, vertexID: JobVertexID,
-                                        subtask: Int,
-                                        executionID: ExecutionAttemptID): Unit = {
-    monitoredThreads += environment -> new EnvironmentThreadSet(tmx,
-      environment.getExecutingThread, vertexID,
-      subtask, executionID)
-  }
-
-  def unregisterMainThreadFromCPUProfiling(environment: RuntimeEnvironment): Unit = {
-    monitoredThreads.remove(environment) match {
-      case Some(set) =>
-        if (set.getMainThread != environment.getExecutingThread) {
-          log.error(s"The thread ${environment.getExecutingThread.getName} is not the main thread" +
-            s" of this environment.")
-        }
-      case None =>
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java
deleted file mode 100644
index 85a4a37..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling.impl;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.profiling.ProfilingException;
-import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(InstanceProfiler.class)
-public class InstanceProfilerTest {
-
-	@Mock
-	private InstanceConnectionInfo infoMock;
-
-	@Mock
-	private InetAddress addressMock;
-
-	@Mock
-	private BufferedReader cpuBufferMock;
-
-	@Mock
-	private BufferedReader networkBufferMock;
-
-	@Mock
-	private BufferedReader memoryBufferMock;
-
-	@Mock
-	private FileReader cpuReaderMock;
-
-	@Mock
-	private FileReader networkReaderMock;
-
-	@Mock
-	private FileReader memoryReaderMock;
-
-	// object under test
-	InstanceProfiler out;
-
-	@Before
-	public void setUp() throws Exception {
-		initMocks(this);
-		when(this.infoMock.address()).thenReturn(this.addressMock);
-		when(this.addressMock.getHostAddress()).thenReturn("192.168.1.1");
-
-		whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_STAT).thenReturn(this.cpuReaderMock);
-		whenNew(BufferedReader.class).withArguments(this.cpuReaderMock).thenReturn(this.cpuBufferMock);
-
-		when(this.cpuBufferMock.readLine()).thenReturn(
-			"cpu  222875 20767 209704 3782096 209864 0 1066 0 0 0"
-			);
-
-		whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_NET_DEV).thenReturn(this.networkReaderMock);
-		whenNew(BufferedReader.class).withArguments(this.networkReaderMock).thenReturn(this.networkBufferMock);
-
-		when(this.networkBufferMock.readLine())
-			.thenReturn(
-				"  eth0: 364729203  286442    0    0    0     0          0      1060 14483806  191563    0    0    0     0       0          0",
-				(String) null,
-				"  eth0: 364729203  286442    0    0    0     0          0      1060 14483806  191563    0    0    0     0       0          0",
-				(String) null,
-				"  eth0: 364729203  286442    0    0    0     0          0      1060 14483806  191563    0    0    0     0       0          0",
-				(String) null
-			);
-
-		whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_MEMINFO).thenReturn(this.memoryReaderMock);
-		whenNew(BufferedReader.class).withArguments(this.memoryReaderMock).thenReturn(this.memoryBufferMock);
-
-		when(this.memoryBufferMock.readLine()).thenReturn(
-			"MemTotal:        8052956 kB",
-			"MemFree:         3999880 kB",
-			"Buffers:           77216 kB",
-			"Cached:          1929640 kB",
-			null,
-			"MemTotal:        8052956 kB",
-			"MemFree:         3999880 kB",
-			"Buffers:           77216 kB",
-			"Cached:          1929640 kB",
-			null,
-			"MemTotal:        8052956 kB",
-			"MemFree:         3999880 kB",
-			"Buffers:           77216 kB",
-			"Cached:          1929640 kB",
-			null
-			);
-
-		PowerMockito.mockStatic(System.class);
-		when(System.currentTimeMillis()).thenReturn(0L);
-
-		this.out = new InstanceProfiler("InstanceProfilerTest");
-	}
-
-	@Test
-	public void shouldHaveNetworkTraffic() {
-
-		try {
-			final InternalInstanceProfilingData generateProfilingData = out.generateProfilingData(0L);
-			assertEquals(0L, generateProfilingData.getReceivedBytes());
-			assertEquals(0L, generateProfilingData.getTransmittedBytes());
-		} catch (ProfilingException e) {
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void shouldHaveMemSettingsMeasured() {
-
-		try {
-			final InternalInstanceProfilingData generateProfilingData = out.generateProfilingData(0L);
-
-			final long totalMemory = generateProfilingData.getTotalMemory();
-			assertThat(totalMemory, is(equalTo(8052956L)));
-
-			long freeMemory = generateProfilingData.getFreeMemory();
-			assertThat(freeMemory, is(equalTo(3999880L)));
-
-			long buffers = generateProfilingData.getBufferedMemory();
-			assertThat(buffers, is(equalTo(77216L)));
-
-			long cached = generateProfilingData.getCachedMemory();
-			assertThat(cached, is(equalTo(1929640L)));
-		} catch (ProfilingException e) {
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void shouldMeasureCPUUtilization() {
-
-		try {
-			final InternalInstanceProfilingData generateProfilingData = out.generateProfilingData(0L);
-
-			assertEquals(0L, generateProfilingData.getUserCPU());
-			assertEquals(0L, generateProfilingData.getIdleCPU());
-			assertEquals(0L, generateProfilingData.getSystemCPU());
-			assertEquals(0L, generateProfilingData.getHardIrqCPU());
-			assertEquals(0L, generateProfilingData.getSoftIrqCPU());
-			assertEquals(0L, generateProfilingData.getIOWaitCPU());
-		} catch (ProfilingException e) {
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
deleted file mode 100644
index c3c75ce..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.ManagementTestUtils;
-import org.junit.Test;
-
-/**
- * This test checks the proper serialization and deserialization of profiling events.
- */
-public class ProfilingTypesTest {
-
-	private static final int PROFILING_INTERVAL = 4321;
-
-	private static final int IOWAIT_CPU = 10;
-
-	private static final int IDLE_CPU = 11;
-
-	private static final int USER_CPU = 12;
-
-	private static final int SYSTEM_CPU = 13;
-
-	private static final int HARD_IRQ_CPU = 14;
-
-	private static final int SOFT_IRQ_CPU = 15;
-
-	private static final long TOTAL_MEMORY = 10001L;
-
-	private static final long FREE_MEMORY = 10002L;
-
-	private static final long BUFFERED_MEMORY = 10003L;
-
-	private static final long CACHED_MEMORY = 10004L;
-
-	private static final long CACHED_SWAP_MEMORY = 10005L;
-
-	private static final long RECEIVED_BYTES = 100006L;
-
-	private static final long TRANSMITTED_BYTES = 100007L;
-
-	private static final long TIMESTAMP = 100008L;
-
-	private static final long PROFILING_TIMESTAMP = 100009L;
-
-	private static final String INSTANCE_NAME = "Test Instance";
-
-	private static final int USER_TIME = 17;
-
-	private static final int SYSTEM_TIME = 18;
-
-	private static final int BLOCKED_TIME = 19;
-
-	private static final int WAITED_TIME = 20;
-
-	/**
-	 * Tests serialization/deserialization for {@link InstanceSummaryProfilingEvent}.
-	 */
-	@Test
-	public void testInstanceSummaryProfilingEvent() {
-
-		final InstanceSummaryProfilingEvent orig = new InstanceSummaryProfilingEvent(PROFILING_INTERVAL, IOWAIT_CPU,
-			IDLE_CPU, USER_CPU, SYSTEM_CPU, HARD_IRQ_CPU, SOFT_IRQ_CPU, TOTAL_MEMORY, FREE_MEMORY, BUFFERED_MEMORY,
-			CACHED_MEMORY, CACHED_SWAP_MEMORY, RECEIVED_BYTES, TRANSMITTED_BYTES, new JobID(), TIMESTAMP,
-			PROFILING_TIMESTAMP);
-
-		final InstanceSummaryProfilingEvent copy = (InstanceSummaryProfilingEvent) ManagementTestUtils.createCopy(orig);
-
-		assertEquals(orig.getProfilingInterval(), copy.getProfilingInterval());
-		assertEquals(orig.getIOWaitCPU(), copy.getIOWaitCPU());
-		assertEquals(orig.getIdleCPU(), copy.getIdleCPU());
-		assertEquals(orig.getUserCPU(), copy.getUserCPU());
-		assertEquals(orig.getSystemCPU(), copy.getSystemCPU());
-		assertEquals(orig.getHardIrqCPU(), copy.getHardIrqCPU());
-		assertEquals(orig.getSoftIrqCPU(), copy.getSoftIrqCPU());
-		assertEquals(orig.getTotalMemory(), copy.getTotalMemory());
-		assertEquals(orig.getFreeMemory(), copy.getFreeMemory());
-		assertEquals(orig.getBufferedMemory(), copy.getBufferedMemory());
-		assertEquals(orig.getCachedMemory(), copy.getCachedMemory());
-		assertEquals(orig.getCachedSwapMemory(), copy.getCachedSwapMemory());
-		assertEquals(orig.getReceivedBytes(), copy.getReceivedBytes());
-		assertEquals(orig.getTransmittedBytes(), copy.getTransmittedBytes());
-		assertEquals(orig.getJobID(), copy.getJobID());
-		assertEquals(orig.getTimestamp(), copy.getTimestamp());
-		assertEquals(orig.getProfilingTimestamp(), copy.getProfilingTimestamp());
-		assertEquals(orig.hashCode(), copy.hashCode());
-		assertTrue(orig.equals(copy));
-	}
-
-	/**
-	 * Tests serialization/deserialization for {@link SingleInstanceProfilingEvent}.
-	 */
-	@Test
-	public void testSingleInstanceProfilingEvent() {
-		try {
-			final SingleInstanceProfilingEvent orig = new SingleInstanceProfilingEvent(PROFILING_INTERVAL, IOWAIT_CPU,
-				IDLE_CPU, USER_CPU, SYSTEM_CPU, HARD_IRQ_CPU, SOFT_IRQ_CPU, TOTAL_MEMORY, FREE_MEMORY, BUFFERED_MEMORY,
-				CACHED_MEMORY, CACHED_SWAP_MEMORY, RECEIVED_BYTES, TRANSMITTED_BYTES, new JobID(), TIMESTAMP,
-				PROFILING_TIMESTAMP, INSTANCE_NAME);
-	
-			final SingleInstanceProfilingEvent copy = (SingleInstanceProfilingEvent) CommonTestUtils.createCopyWritable(orig);
-	
-			assertEquals(orig.getProfilingInterval(), copy.getProfilingInterval());
-			assertEquals(orig.getIOWaitCPU(), copy.getIOWaitCPU());
-			assertEquals(orig.getIdleCPU(), copy.getIdleCPU());
-			assertEquals(orig.getUserCPU(), copy.getUserCPU());
-			assertEquals(orig.getSystemCPU(), copy.getSystemCPU());
-			assertEquals(orig.getHardIrqCPU(), copy.getHardIrqCPU());
-			assertEquals(orig.getSoftIrqCPU(), copy.getSoftIrqCPU());
-			assertEquals(orig.getTotalMemory(), copy.getTotalMemory());
-			assertEquals(orig.getFreeMemory(), copy.getFreeMemory());
-			assertEquals(orig.getBufferedMemory(), copy.getBufferedMemory());
-			assertEquals(orig.getCachedMemory(), copy.getCachedMemory());
-			assertEquals(orig.getCachedSwapMemory(), copy.getCachedSwapMemory());
-			assertEquals(orig.getReceivedBytes(), copy.getReceivedBytes());
-			assertEquals(orig.getTransmittedBytes(), copy.getTransmittedBytes());
-			assertEquals(orig.getJobID(), copy.getJobID());
-			assertEquals(orig.getTimestamp(), copy.getTimestamp());
-			assertEquals(orig.getProfilingTimestamp(), copy.getProfilingTimestamp());
-			assertEquals(orig.getInstanceName(), copy.getInstanceName());
-			assertEquals(orig.hashCode(), copy.hashCode());
-			assertTrue(orig.equals(copy));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-
-	}
-
-	/**
-	 * Tests serialization/deserialization for {@link ThreadProfilingEvent}.
-	 */
-	@Test
-	public void testThreadProfilingEvent() {
-
-		final ThreadProfilingEvent orig = new ThreadProfilingEvent(USER_TIME, SYSTEM_TIME, BLOCKED_TIME, WAITED_TIME,
-			new JobVertexID(), 17, new ExecutionAttemptID(), PROFILING_INTERVAL, new JobID(), TIMESTAMP, PROFILING_TIMESTAMP);
-
-		final ThreadProfilingEvent copy = (ThreadProfilingEvent) ManagementTestUtils.createCopy(orig);
-
-		assertEquals(orig.getUserTime(), copy.getUserTime());
-		assertEquals(orig.getSystemTime(), copy.getSystemTime());
-		assertEquals(orig.getBlockedTime(), copy.getBlockedTime());
-		assertEquals(orig.getWaitedTime(), copy.getWaitedTime());
-		assertEquals(orig.getVertexID(), copy.getVertexID());
-		assertEquals(orig.getProfilingInterval(), copy.getProfilingInterval());
-		assertEquals(orig.getJobID(), copy.getJobID());
-		assertEquals(orig.getTimestamp(), copy.getTimestamp());
-		assertEquals(orig.getProfilingTimestamp(), copy.getProfilingTimestamp());
-		assertEquals(orig.hashCode(), copy.hashCode());
-		assertTrue(orig.equals(copy));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index f87e151..9f9fe93 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -49,7 +49,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
 
   override def startJobManager(actorSystem: ActorSystem): ActorRef = {
 
-    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _,
+    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
     executionRetries, delayBetweenRetries,
     timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
 
@@ -57,7 +57,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
 
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
-      libraryCacheManager, archive, accumulatorManager, None, executionRetries,
+      libraryCacheManager, archive, accumulatorManager, executionRetries,
       delayBetweenRetries, timeout) with TestingJobManager)
 
     actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index ddfffee..05093b5 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -71,7 +71,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
 
   override def startJobManager(actorSystem: ActorSystem): ActorRef = {
 
-    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _,
+    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
     executionRetries, delayBetweenRetries,
     timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
 
@@ -79,7 +79,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
 
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
-      libraryCacheManager, archive, accumulatorManager, None, executionRetries,
+      libraryCacheManager, archive, accumulatorManager, executionRetries,
       delayBetweenRetries, timeout) with TestingJobManager)
 
     val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)

http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 06e16dc..7884edd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.slf4j.LoggerFactory
+
 
 import scala.io.Source
 
@@ -228,18 +228,14 @@ object ApplicationMaster {
     // start all the components inside the job manager
     LOG.debug("Starting JobManager components")
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
-                   profilerProps, executionRetries, delayBetweenRetries,
+                   executionRetries, delayBetweenRetries,
                    timeout, _) = JobManager.createJobManagerComponents(configuration)
 
-    // start the profiler, if needed
-    val profiler: Option[ActorRef] =
-      profilerProps.map( props => jobManagerSystem.actorOf(props, JobManager.PROFILER_NAME) )
-
     // start the archiver
     val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
 
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
-      libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+      libraryCacheManager, archiver, accumulatorManager, executionRetries,
       delayBetweenRetries, timeout) with ApplicationMasterActor)
 
     LOG.debug("Starting JobManager actor")