You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2015/10/20 10:47:52 UTC

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1270

    [FLINK-2869] [tests] Apply JMH on IOManagerPerformanceBenchmark class.

    JMH is a Java harness for building, running, and analysing nano/micro/milli/macro benchmarks.Use JMH to replace the old micro benchmarks method in order to get much more accurate results.Modify the `IOManagerPerformanceBenchmark` class and move it to `flink-benchmark` module.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink IOManagerPerformanceBenchmark

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1270.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1270
    
----
commit dfcda0bc80fc9aaf01bfb73fb63b1e9351b8986a
Author: gallenvara <ga...@126.com>
Date:   2015-10-20T01:49:34Z

    [FLINK-2869] [tests] Apply JMH on IOManagerPerformanceBenchmark class.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1270#discussion_r43203880
  
    --- Diff: flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.benchmark.runtime.io.disk.iomanager;
    +
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +import org.apache.flink.runtime.io.disk.iomanager.*;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.DummyInvokable;
    +import org.apache.flink.types.IntValue;
    +import org.junit.Assert;
    +import org.openjdk.jmh.annotations.*;
    +import org.openjdk.jmh.runner.Runner;
    +import org.openjdk.jmh.runner.options.Options;
    +import org.openjdk.jmh.runner.options.OptionsBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.*;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.FileChannel;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +@State(Scope.Thread)
    +@BenchmarkMode(Mode.AverageTime)
    +@OutputTimeUnit(TimeUnit.MILLISECONDS)
    +public class IOManagerPerformanceBenchmark {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
    +
    +	@Param({"4096", "16384", "524288"})
    +	private int segmentSizesAligned;
    +
    +	@Param({"3862", "16895", "500481"})
    +	private int segmentSizesUnaligned;
    +
    +	@Param({"1", "2", "4", "6"})
    +	private int numSegment;
    +
    +	private static int numBlocks;
    +
    +	private static final long MEMORY_SIZE = 32 * 1024 * 1024;
    +
    +	private static final int NUM_INTS_WRITTEN = 100000000;
    +
    +	private static final AbstractInvokable memoryOwner = new DummyInvokable();
    +
    +	private MemoryManager memManager;
    +
    +	private IOManager ioManager;
    +	
    +	private static FileIOChannel.ID fileIOChannel;
    +	
    +	private static File ioManagerTempFile1;
    +	
    +	private static File ioManagerTempFile2;
    +	
    +	private static File speedTestNIOTempFile1;
    +	
    +	private static File speedTestNIOTempFile2;
    +	
    +	private static File speedTestNIOTempFile3;
    +	
    +	private static File speedTestNIOTempFile4;
    +
    +
    +	@Setup
    +	public void startup() throws Exception {
    +		memManager = new MemoryManager(MEMORY_SIZE, 1);
    +		ioManager = new IOManagerAsync();
    +		this.testChannelWriteWithSegments(numSegment);
    +		ioManagerTempFile1 = this.createReadTempFile(segmentSizesAligned);
    +		ioManagerTempFile2 = this.createReadTempFile(segmentSizesUnaligned);
    +		speedTestNIOTempFile1 = creatSpeedTestNIOTempFile(segmentSizesAligned, true);
    +		speedTestNIOTempFile2 = creatSpeedTestNIOTempFile(segmentSizesAligned, false);
    +		speedTestNIOTempFile3 = creatSpeedTestNIOTempFile(segmentSizesUnaligned, true);
    +		speedTestNIOTempFile4 = creatSpeedTestNIOTempFile(segmentSizesUnaligned, false);
    +		
    +	}
    +
    +	@TearDown
    +	public void afterTest() throws Exception {
    +		ioManager.shutdown();
    +		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
    +
    +		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memManager.verifyEmpty());
    +		memManager.shutdown();
    +		memManager = null;
    +	}
    +
    +// ------------------------------------------------------------------------
    +
    +	private File createReadTempFile(int bufferSize) throws IOException {
    +		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
    +		final IntValue rec = new IntValue(0);
    +
    +		File tempFile = null;
    +		DataOutputStream daos = null;
    +
    +		try {
    +			tempFile = new File(tmpChannel.getPath());
    +
    +			FileOutputStream fos = new FileOutputStream(tempFile);
    +			daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize));
    +
    +			int valsLeft = NUM_INTS_WRITTEN;
    +			while (valsLeft-- > 0) {
    +				rec.setValue(valsLeft);
    +				rec.write(new OutputViewDataOutputStreamWrapper(daos));
    +			}
    +			daos.close();
    +			daos = null;
    +		}
    +		finally {
    +			// close if possible
    +			if (daos != null) {
    +				daos.close();
    +			}
    +		}
    +		return tempFile;
    +	}
    +
    +	@SuppressWarnings("resource")
    +	private File creatSpeedTestNIOTempFile(int bufferSize, boolean direct) throws IOException
    +	{
    +		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
    +
    +		File tempFile = null;
    +		FileChannel fs = null;
    +
    +		try {
    +			tempFile = new File(tmpChannel.getPath());
    +
    +			RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
    +			fs = raf.getChannel();
    +
    +			ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
    +
    +			int valsLeft = NUM_INTS_WRITTEN;
    +			while (valsLeft-- > 0) {
    +				if (buf.remaining() < 4) {
    +					buf.flip();
    +					fs.write(buf);
    +					buf.clear();
    +				}
    +				buf.putInt(valsLeft);
    +			}
    +
    +			if (buf.position() > 0) {
    +				buf.flip();
    +				fs.write(buf);
    +			}
    +
    +			fs.close();
    +			raf.close();
    +			fs = null;
    +		}
    +		finally {
    +			// close if possible
    +			if (fs != null) {
    +				fs.close();
    +				fs = null;
    +			}
    +		}
    +		return tempFile;
    +	}
    +	
    +	@Benchmark
    +	public void speedTestOutputManager() throws Exception
    +	{
    +		LOG.info("Starting speed test with IO Manager...");
    +		
    +		testChannelWriteWithSegments(numSegment);
    +	}
    +
    +	@Benchmark
    +	public void speedTestInputManager() throws Exception
    +	{
    +		LOG.info("Starting speed test with IO Manager...");
    +		
    +		testChannelReadWithSegments(numSegment);
    +	}
    +
    +	private void testChannelWriteWithSegments(int numSegments) throws Exception
    +	{
    +		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
    +		final FileIOChannel.ID channel = this.ioManager.createChannel();
    +
    +		BlockChannelWriter<MemorySegment> writer = null;
    +
    +		try {
    +			writer = this.ioManager.createBlockChannelWriter(channel);
    +			final ChannelWriterOutputView out = new ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
    +
    +			int valsLeft = NUM_INTS_WRITTEN;
    +			while (valsLeft-- > 0) {
    +				out.writeInt(valsLeft);
    +			}
    +
    +			fileIOChannel = channel;
    +			out.close();
    +			numBlocks = out.getBlockCount();
    +			
    +			writer.close();
    +			writer = null;
    +
    +			memManager.release(memory);
    +		}
    +		finally {
    +			if (writer != null) {
    +				writer.closeAndDelete();
    +			}
    +		}
    +	}
    +
    +	private void testChannelReadWithSegments(int numSegments) throws Exception
    +	{
    +		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
    +		//final FileIOChannel.ID channel = this.ioManager.createChannel();
    --- End diff --
    
    remove?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1270#issuecomment-151773654
  
    Thanks for the update.
    Looks good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1270#discussion_r43212327
  
    --- Diff: flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.benchmark.runtime.io.disk.iomanager;
    +
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +import org.apache.flink.runtime.io.disk.iomanager.*;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.DummyInvokable;
    +import org.apache.flink.types.IntValue;
    +import org.junit.Assert;
    +import org.openjdk.jmh.annotations.*;
    +import org.openjdk.jmh.runner.Runner;
    +import org.openjdk.jmh.runner.options.Options;
    +import org.openjdk.jmh.runner.options.OptionsBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.*;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.FileChannel;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +@State(Scope.Thread)
    +@BenchmarkMode(Mode.AverageTime)
    +@OutputTimeUnit(TimeUnit.MILLISECONDS)
    +public class IOManagerPerformanceBenchmark {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
    +
    +	@Param({"4096", "16384", "524288"})
    +	private int segmentSizesAligned;
    +
    +	@Param({"3862", "16895", "500481"})
    +	private int segmentSizesUnaligned;
    +
    +	@Param({"1", "2", "4", "6"})
    +	private int numSegment;
    +
    +	private static int numBlocks;
    +
    +	private static final long MEMORY_SIZE = 32 * 1024 * 1024;
    +
    +	private static final int NUM_INTS_WRITTEN = 100000000;
    +
    +	private static final AbstractInvokable memoryOwner = new DummyInvokable();
    +
    +	private MemoryManager memManager;
    +
    +	private IOManager ioManager;
    +	
    +	private static FileIOChannel.ID fileIOChannel;
    +	
    +	private static File ioManagerTempFile1;
    +	
    +	private static File ioManagerTempFile2;
    +	
    +	private static File speedTestNIOTempFile1;
    +	
    +	private static File speedTestNIOTempFile2;
    +	
    +	private static File speedTestNIOTempFile3;
    +	
    +	private static File speedTestNIOTempFile4;
    +
    +
    +	@Setup
    +	public void startup() throws Exception {
    +		memManager = new MemoryManager(MEMORY_SIZE, 1);
    +		ioManager = new IOManagerAsync();
    +		this.testChannelWriteWithSegments(numSegment);
    +		ioManagerTempFile1 = this.createReadTempFile(segmentSizesAligned);
    --- End diff --
    
    It's my fault that I forget to delete them.
    Now i have add these operations to the `@TearDown` and submitted a new commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1270#issuecomment-152122559
  
    I will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1270


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1270#discussion_r43203668
  
    --- Diff: flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.benchmark.runtime.io.disk.iomanager;
    +
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +import org.apache.flink.runtime.io.disk.iomanager.*;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.DummyInvokable;
    +import org.apache.flink.types.IntValue;
    +import org.junit.Assert;
    +import org.openjdk.jmh.annotations.*;
    +import org.openjdk.jmh.runner.Runner;
    +import org.openjdk.jmh.runner.options.Options;
    +import org.openjdk.jmh.runner.options.OptionsBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.*;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.FileChannel;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +@State(Scope.Thread)
    +@BenchmarkMode(Mode.AverageTime)
    +@OutputTimeUnit(TimeUnit.MILLISECONDS)
    +public class IOManagerPerformanceBenchmark {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
    +
    +	@Param({"4096", "16384", "524288"})
    +	private int segmentSizesAligned;
    +
    +	@Param({"3862", "16895", "500481"})
    +	private int segmentSizesUnaligned;
    +
    +	@Param({"1", "2", "4", "6"})
    +	private int numSegment;
    +
    +	private static int numBlocks;
    +
    +	private static final long MEMORY_SIZE = 32 * 1024 * 1024;
    +
    +	private static final int NUM_INTS_WRITTEN = 100000000;
    +
    +	private static final AbstractInvokable memoryOwner = new DummyInvokable();
    +
    +	private MemoryManager memManager;
    +
    +	private IOManager ioManager;
    +	
    +	private static FileIOChannel.ID fileIOChannel;
    +	
    +	private static File ioManagerTempFile1;
    +	
    +	private static File ioManagerTempFile2;
    +	
    +	private static File speedTestNIOTempFile1;
    +	
    +	private static File speedTestNIOTempFile2;
    +	
    +	private static File speedTestNIOTempFile3;
    +	
    +	private static File speedTestNIOTempFile4;
    +
    +
    +	@Setup
    +	public void startup() throws Exception {
    +		memManager = new MemoryManager(MEMORY_SIZE, 1);
    +		ioManager = new IOManagerAsync();
    +		this.testChannelWriteWithSegments(numSegment);
    +		ioManagerTempFile1 = this.createReadTempFile(segmentSizesAligned);
    +		ioManagerTempFile2 = this.createReadTempFile(segmentSizesUnaligned);
    +		speedTestNIOTempFile1 = creatSpeedTestNIOTempFile(segmentSizesAligned, true);
    +		speedTestNIOTempFile2 = creatSpeedTestNIOTempFile(segmentSizesAligned, false);
    +		speedTestNIOTempFile3 = creatSpeedTestNIOTempFile(segmentSizesUnaligned, true);
    +		speedTestNIOTempFile4 = creatSpeedTestNIOTempFile(segmentSizesUnaligned, false);
    +		
    +	}
    +
    +	@TearDown
    +	public void afterTest() throws Exception {
    +		ioManager.shutdown();
    +		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
    +
    +		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memManager.verifyEmpty());
    +		memManager.shutdown();
    +		memManager = null;
    +	}
    +
    +// ------------------------------------------------------------------------
    +
    +	private File createReadTempFile(int bufferSize) throws IOException {
    +		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
    +		final IntValue rec = new IntValue(0);
    +
    +		File tempFile = null;
    +		DataOutputStream daos = null;
    +
    +		try {
    +			tempFile = new File(tmpChannel.getPath());
    +
    +			FileOutputStream fos = new FileOutputStream(tempFile);
    +			daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize));
    +
    +			int valsLeft = NUM_INTS_WRITTEN;
    +			while (valsLeft-- > 0) {
    +				rec.setValue(valsLeft);
    +				rec.write(new OutputViewDataOutputStreamWrapper(daos));
    +			}
    +			daos.close();
    +			daos = null;
    +		}
    +		finally {
    +			// close if possible
    +			if (daos != null) {
    +				daos.close();
    +			}
    +		}
    +		return tempFile;
    +	}
    +
    +	@SuppressWarnings("resource")
    +	private File creatSpeedTestNIOTempFile(int bufferSize, boolean direct) throws IOException
    --- End diff --
    
    should be creat**e**SpeedTestNIOTempFile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1270#discussion_r43204232
  
    --- Diff: flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.benchmark.runtime.io.disk.iomanager;
    +
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +import org.apache.flink.runtime.io.disk.iomanager.*;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.DummyInvokable;
    +import org.apache.flink.types.IntValue;
    +import org.junit.Assert;
    +import org.openjdk.jmh.annotations.*;
    +import org.openjdk.jmh.runner.Runner;
    +import org.openjdk.jmh.runner.options.Options;
    +import org.openjdk.jmh.runner.options.OptionsBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.*;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.FileChannel;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +@State(Scope.Thread)
    +@BenchmarkMode(Mode.AverageTime)
    +@OutputTimeUnit(TimeUnit.MILLISECONDS)
    +public class IOManagerPerformanceBenchmark {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
    +
    +	@Param({"4096", "16384", "524288"})
    +	private int segmentSizesAligned;
    +
    +	@Param({"3862", "16895", "500481"})
    +	private int segmentSizesUnaligned;
    +
    +	@Param({"1", "2", "4", "6"})
    +	private int numSegment;
    +
    +	private static int numBlocks;
    +
    +	private static final long MEMORY_SIZE = 32 * 1024 * 1024;
    +
    +	private static final int NUM_INTS_WRITTEN = 100000000;
    +
    +	private static final AbstractInvokable memoryOwner = new DummyInvokable();
    +
    +	private MemoryManager memManager;
    +
    +	private IOManager ioManager;
    +	
    +	private static FileIOChannel.ID fileIOChannel;
    +	
    +	private static File ioManagerTempFile1;
    +	
    +	private static File ioManagerTempFile2;
    +	
    +	private static File speedTestNIOTempFile1;
    +	
    +	private static File speedTestNIOTempFile2;
    +	
    +	private static File speedTestNIOTempFile3;
    +	
    +	private static File speedTestNIOTempFile4;
    +
    +
    +	@Setup
    +	public void startup() throws Exception {
    +		memManager = new MemoryManager(MEMORY_SIZE, 1);
    +		ioManager = new IOManagerAsync();
    +		this.testChannelWriteWithSegments(numSegment);
    +		ioManagerTempFile1 = this.createReadTempFile(segmentSizesAligned);
    --- End diff --
    
    are these files deleted after the benchmark completed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2869] [tests] Apply JMH on IOManagerPer...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1270#issuecomment-149765941
  
    Part of the JMH benchmark result : 
    ![jmhiomanagerbenchmark](https://cloud.githubusercontent.com/assets/12931563/10626358/a82ce376-77e0-11e5-9fb6-a604167db867.PNG)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---