You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by HuangWHWHW <gi...@git.apache.org> on 2015/08/05 05:04:47 UTC

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

GitHub user HuangWHWHW opened a pull request:

    https://github.com/apache/flink/pull/991

    [FLINK-2480][test]Add tests for PrintSinkFunction

    Test PrintSinkFunction:
    set number of subtasks with 0 in runtime ctx to make prefix null.
    1.set STD.OUT and test print system.out
    2.set STD.ERR and test print system.err

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HuangWHWHW/flink FLINK-2480

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #991
    
----
commit cfd883deacfdd3bcbfc7b5da6f4ce23d2d21c04f
Author: HuangWHWHW <40...@qq.com>
Date:   2015-08-05T02:55:58Z

    [FLINK-2480][test]Add tests for PrintSinkFunction

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37961564
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,330 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return new Map<String, Future<Path>>() {
    +				@Override
    +				public int size() {
    +					return 0;
    +				}
    +
    +				@Override
    +				public boolean isEmpty() {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsKey(Object key) {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsValue(Object value) {
    +					return false;
    +				}
    +
    +				@Override
    +				public Future<Path> get(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> put(String key, Future<Path> value) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> remove(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public void putAll(Map<? extends String, ? extends Future<Path>> m) {
    +
    +				}
    +
    +				@Override
    +				public void clear() {
    +
    +				}
    +
    +				@Override
    +				public Set<String> keySet() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Collection<Future<Path>> values() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Set<Entry<String, Future<Path>>> entrySet() {
    +					return null;
    +				}
    +			};
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				this.envForPrefixNull,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    +		assertEquals("hello world!", stream.result);
    +
    +		printSink.close();
    +	}
    +
    +	@Test
    +	public void testPrintSinkStdErr(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				this.envForPrefixNull,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardErr();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.err", printSink.toString());
    +		assertEquals("hello world!", stream.result);
    +
    +		printSink.close();
    +	}
    +
    +	@Override
    +	public void invoke(IN record) {
    +
    +	}
    +
    +	@After
    +	public void restoreSystemOut() {
    +		System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out)));
    --- End diff --
    
    Ok, I add this in my new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-134566835
  
    @mxm 
    Thank you.
    Sorry for that I haven`t updated the code in this branch for a long time.
    So you can wait the CI to pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-135360612
  
    Thank you @HuangWHWHW. I merged your pull request with a few minor changes (see new comments).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37962927
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return new Map<String, Future<Path>>() {
    +				@Override
    +				public int size() {
    +					return 0;
    +				}
    +
    +				@Override
    +				public boolean isEmpty() {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsKey(Object key) {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsValue(Object value) {
    +					return false;
    +				}
    +
    +				@Override
    +				public Future<Path> get(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> put(String key, Future<Path> value) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> remove(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public void putAll(Map<? extends String, ? extends Future<Path>> m) {
    +
    +				}
    +
    +				@Override
    +				public void clear() {
    +
    +				}
    +
    +				@Override
    +				public Set<String> keySet() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Collection<Future<Path>> values() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Set<Entry<String, Future<Path>>> entrySet() {
    +					return null;
    +				}
    +			};
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	public PrintStream PrintStreamOriginal = null;
    --- End diff --
    
    This should be `private PrintStream PrintStreamOriginal = System.out;`. Then you don't need the null check later on and simply restore it in the `@After` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/991


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-130999180
  
    Hi, I have done a new changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-135054171
  
    @mxm 
    Hi,
    sorry for a careless again.
    Now I fix the problem which the CI was failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-134781611
  
    @StephanEwen 
    Hi,
    Not yet.
    I will ask the travis support again.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37845122
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,267 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.io.PrintStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    --- End diff --
    
    Could you save and restore `System.out` in an `@After` shutdown method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37959743
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,330 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return new Map<String, Future<Path>>() {
    +				@Override
    +				public int size() {
    +					return 0;
    +				}
    +
    +				@Override
    +				public boolean isEmpty() {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsKey(Object key) {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsValue(Object value) {
    +					return false;
    +				}
    +
    +				@Override
    +				public Future<Path> get(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> put(String key, Future<Path> value) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> remove(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public void putAll(Map<? extends String, ? extends Future<Path>> m) {
    +
    +				}
    +
    +				@Override
    +				public void clear() {
    +
    +				}
    +
    +				@Override
    +				public Set<String> keySet() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Collection<Future<Path>> values() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Set<Entry<String, Future<Path>>> entrySet() {
    +					return null;
    +				}
    +			};
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				this.envForPrefixNull,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    +		assertEquals("hello world!", stream.result);
    +
    +		printSink.close();
    +	}
    +
    +	@Test
    +	public void testPrintSinkStdErr(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				this.envForPrefixNull,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardErr();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.err", printSink.toString());
    +		assertEquals("hello world!", stream.result);
    +
    +		printSink.close();
    +	}
    +
    +	@Override
    +	public void invoke(IN record) {
    +
    +	}
    +
    +	@After
    +	public void restoreSystemOut() {
    +		System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out)));
    --- End diff --
    
    Could you restore the original PrintStream here? You can save it in a variable like `PrintStream original = System.out` and restore it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37762004
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    --- End diff --
    
    You can replace stdout using `System.setOut(..)` by a `PrintStream` which writes to a `ByteArrayOutputStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #991: [FLINK-2480][test]Add tests for PrintSinkFunction

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/flink/pull/991
  
    
    [![Coverage Status](https://coveralls.io/builds/11380091/badge)](https://coveralls.io/builds/11380091)
    
    Changes Unknown when pulling **fde7c49e04df7c5e7315a20594f65f2cd67fbc82 on HuangWHWHW:FLINK-2480** into ** on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37964105
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return new Map<String, Future<Path>>() {
    +				@Override
    +				public int size() {
    +					return 0;
    +				}
    +
    +				@Override
    +				public boolean isEmpty() {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsKey(Object key) {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsValue(Object value) {
    +					return false;
    +				}
    +
    +				@Override
    +				public Future<Path> get(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> put(String key, Future<Path> value) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> remove(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public void putAll(Map<? extends String, ? extends Future<Path>> m) {
    +
    +				}
    +
    +				@Override
    +				public void clear() {
    +
    +				}
    +
    +				@Override
    +				public Set<String> keySet() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Collection<Future<Path>> values() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Set<Entry<String, Future<Path>>> entrySet() {
    +					return null;
    +				}
    +			};
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	public PrintStream PrintStreamOriginal = null;
    --- End diff --
    
    Yes, fixed these two comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-130295740
  
    Your pull request doesn't compile: https://s3.amazonaws.com/archive.travis-ci.org/jobs/74504427/log.txt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r36284223
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,225 @@
    +package org.apache.flink.streaming.api.functions;
    --- End diff --
    
    Please add the Apache license header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r36990804
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    +
    +		printSink.close();
    +	}
    +
    +	@Test
    +	public void testPrintSinkStdErr(){
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardErr();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.err", printSink.toString());
    --- End diff --
    
    Same as above. This just tests the `toString()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37974911
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,333 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public PrintStream PrintStreamOriginal = System.out;
    --- End diff --
    
    Changed public PrintStream PrintStreamOriginal = System.out; to private PrintStream printStreamOriginal = System.out; now.  :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r36990759
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    --- End diff --
    
    You're not testing whether the sink actually prints to stdout. You're just checking if the `toString()` methods returns the correct mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37847490
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,267 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.io.PrintStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    --- End diff --
    
    Hi, I add this in my new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r38076582
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,332 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public PrintStream printStreamOriginal = System.out;
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return new Map<String, Future<Path>>() {
    +				@Override
    +				public int size() {
    +					return 0;
    +				}
    +
    +				@Override
    +				public boolean isEmpty() {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsKey(Object key) {
    +					return false;
    +				}
    +
    +				@Override
    +				public boolean containsValue(Object value) {
    +					return false;
    +				}
    +
    +				@Override
    +				public Future<Path> get(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> put(String key, Future<Path> value) {
    +					return null;
    +				}
    +
    +				@Override
    +				public Future<Path> remove(Object key) {
    +					return null;
    +				}
    +
    +				@Override
    +				public void putAll(Map<? extends String, ? extends Future<Path>> m) {
    +
    +				}
    +
    +				@Override
    +				public void clear() {
    +
    +				}
    +
    +				@Override
    +				public Set<String> keySet() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Collection<Future<Path>> values() {
    +					return null;
    +				}
    +
    +				@Override
    +				public Set<Entry<String, Future<Path>>> entrySet() {
    +					return null;
    +				}
    +			};
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	public OutputStream out = new OutputStream() {
    +		@Override
    +		public void write(int b) throws IOException {
    +
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		printStreamMock stream = new printStreamMock(out);
    +		System.setOut(stream);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    --- End diff --
    
    You can replace all this by `final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-135261867
  
    @mxm 
    Hi,
    the CI is pass.
    Did it probability failure?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-135062701
  
    Thanks @HuangWHWHW. I'll merge your changes when Travis has completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37845214
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    --- End diff --
    
    Ah, didn't see that. Great!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37053652
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    --- End diff --
    
    Sorry, ignore this.
    I just have got the way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37046489
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    --- End diff --
    
    As I see, the sink actually prints is printSink.invoke("hello world!");.
    And in the function invoke, it just calls stream.println(record.toString());.
    Could you tell me how to get the prints from the stream?
    Because I think assertEquals("hello world!", record); is just like assertEquals("hello world!", "hello world!");  and it`s not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-127982547
  
    @fhueske 
    Thank you!I`ll fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-134635720
  
    @HuangWHWHW Can you access the CI reports now? Has the Travis team fixed the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37965403
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,333 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public PrintStream PrintStreamOriginal = System.out;
    --- End diff --
    
    By convention, this should be lower case here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r36284181
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,225 @@
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +//import org.apache.flink.api.common.functions.RuntimeContext;
    --- End diff --
    
    Please remove the imports which are commented out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37974619
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,333 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public PrintStream PrintStreamOriginal = System.out;
    --- End diff --
    
    AhAh......
    Sorry, I get you now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r38076538
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,332 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.io.*;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	public PrintStream printStreamOriginal = System.out;
    +
    +	public class printStreamMock extends PrintStream{
    +
    +		public String result;
    +
    +		public printStreamMock(OutputStream out) {
    +			super(out);
    +		}
    +
    +		@Override
    +		public void println(String x) {
    +			this.result = x;
    +		}
    +	}
    +
    +	private Environment envForPrefixNull = new Environment() {
    --- End diff --
    
    You can replace all this by `Mockito.mock(Environment.class)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/991#discussion_r37835967
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
    +import org.apache.flink.runtime.memorymanager.MemoryManager;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +import static org.junit.Assert.*;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
    + */
    +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
    +
    +	private Environment envForPrefixNull = new Environment() {
    +		@Override
    +		public JobID getJobID() {
    +			return null;
    +		}
    +
    +		@Override
    +		public JobVertexID getJobVertexId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ExecutionAttemptID getExecutionId() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getTaskConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public TaskManagerRuntimeInfo getTaskManagerInfo() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Configuration getJobConfiguration() {
    +			return null;
    +		}
    +
    +		@Override
    +		public int getNumberOfSubtasks() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public int getIndexInSubtaskGroup() {
    +			return 0;
    +		}
    +
    +		@Override
    +		public InputSplitProvider getInputSplitProvider() {
    +			return null;
    +		}
    +
    +		@Override
    +		public IOManager getIOManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public MemoryManager getMemoryManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskName() {
    +			return null;
    +		}
    +
    +		@Override
    +		public String getTaskNameWithSubtasks() {
    +			return null;
    +		}
    +
    +		@Override
    +		public ClassLoader getUserClassLoader() {
    +			return null;
    +		}
    +
    +		@Override
    +		public Map<String, Future<Path>> getDistributedCacheEntries() {
    +			return null;
    +		}
    +
    +		@Override
    +		public BroadcastVariableManager getBroadcastVariableManager() {
    +			return null;
    +		}
    +
    +		@Override
    +		public AccumulatorRegistry getAccumulatorRegistry() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId) {
    +
    +		}
    +
    +		@Override
    +		public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
    +
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter getWriter(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public ResultPartitionWriter[] getAllWriters() {
    +			return new ResultPartitionWriter[0];
    +		}
    +
    +		@Override
    +		public InputGate getInputGate(int index) {
    +			return null;
    +		}
    +
    +		@Override
    +		public InputGate[] getAllInputGates() {
    +			return new InputGate[0];
    +		}
    +	};
    +
    +	@Test
    +	public void testPrintSinkStdOut(){
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
    +		final StreamingRuntimeContext ctx = new StreamingRuntimeContext(
    +				"Test Print Sink",
    +				this.envForPrefixNull,
    +				null,
    +				executionConfig,
    +				null,
    +				null,
    +				accumulators
    +		);
    +
    +		PrintSinkFunction<String> printSink = new PrintSinkFunction<String>();
    +		printSink.setRuntimeContext(ctx);
    +		try {
    +			printSink.open(new Configuration());
    +		} catch (Exception e) {
    +			e.printStackTrace();
    +		}
    +		printSink.setTargetToStandardOut();
    +		printSink.invoke("hello world!");
    +
    +		assertEquals("Print to System.out", printSink.toString());
    --- End diff --
    
    Yes, I did this in my new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-134565956
  
    Thanks for the updates! Looks good and I think we can merge your changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/991#issuecomment-127834382
  
    I still cannot see the CI.
    Does any one can help to support this CI info??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---