You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/08/31 13:41:01 UTC
[jira] [Commented] (BEAM-2831) Possible bug in Beam+Flink memory
management, disk spillover
[ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148985#comment-16148985 ]
Aljoscha Krettek commented on BEAM-2831:
----------------------------------------
Could you try running it with this modified {{SerializableCoder}}:
{coder}
public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
/**
* Returns a {@link SerializableCoder} instance for the provided element type.
* @param <T> the element type
*/
public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) type.getRawType();
return new SerializableCoder<>(clazz, type);
}
/**
* Returns a {@link SerializableCoder} instance for the provided element class.
* @param <T> the element type
*/
public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
}
/**
* Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for
* all types.
*
* <p>This method is invoked reflectively from {@link DefaultCoder}.
*/
@SuppressWarnings("unused")
public static CoderProvider getCoderProvider() {
return new SerializableCoderProvider();
}
/**
* A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
* serializable types.
*/
public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar {
@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(getCoderProvider());
}
}
/**
* A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
* implements serializable.
*/
static class SerializableCoderProvider extends CoderProvider {
@Override
public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
return SerializableCoder.of((TypeDescriptor) typeDescriptor);
}
throw new CannotProvideCoderException(
"Cannot provide SerializableCoder because " + typeDescriptor
+ " does not implement Serializable");
}
}
private final Class<T> type;
private transient TypeDescriptor<T> typeDescriptor;
protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor) {
this.type = type;
this.typeDescriptor = typeDescriptor;
}
public Class<T> getRecordType() {
return type;
}
@Override
public void encode(T value, OutputStream outStream)
throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(outStream);
oos.writeObject(value);
oos.flush();
}
@Override
public T decode(InputStream inStream)
throws IOException, CoderException {
try {
ObjectInputStream ois = new ObjectInputStream(inStream);
return type.cast(ois.readObject());
} catch (ClassNotFoundException e) {
throw new CoderException("unable to deserialize record", e);
}
}
/**
* {@inheritDoc}
*
* @throws NonDeterministicException always. Java serialization is not
* deterministic with respect to {@link Object#equals} for all types.
*/
@Override
public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(this,
"Java Serialization may be non-deterministic.");
}
@Override
public boolean equals(Object other) {
return !(other == null || getClass() != other.getClass())
&& type == ((SerializableCoder<?>) other).type;
}
@Override
public int hashCode() {
return type.hashCode();
}
@Override
public TypeDescriptor<T> getEncodedTypeDescriptor() {
if (typeDescriptor == null) {
typeDescriptor = TypeDescriptor.of(type);
}
return typeDescriptor;
}
// This coder inherits isRegisterByteSizeObserverCheap,
// getEncodedElementByteSize and registerByteSizeObserver
// from StructuredCoder. Looks like we cannot do much better
// in this case.
}
{code}
The only change is in {{encode()}} where we don't wrap the {{EOFException}} anymore. I think this should fix the problem and if it indeed does we should include this change in Beam.
> Possible bug in Beam+Flink memory management, disk spillover
> ------------------------------------------------------------
>
> Key: BEAM-2831
> URL: https://issues.apache.org/jira/browse/BEAM-2831
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.0.0, 2.1.0
> Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 10.12.6 and unknown Linux
> Reporter: Reinier Kip
> Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail.
> As [discussed on Flink's mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html] (stacktrace enclosed), Flink catches these EOFExceptions and activates disk spillover. Because Beam wraps these exceptions, this mechanism fails, the exception travels up the stack, and the job aborts.
> Hopefully this is enough information and this is something that can be adjusted for in Beam. I'd be glad to provide more information where needed.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)