You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/08/31 13:42:00 UTC

[jira] [Comment Edited] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover

    [ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148985#comment-16148985 ] 

Aljoscha Krettek edited comment on BEAM-2831 at 8/31/17 1:41 PM:
-----------------------------------------------------------------

Could you try running it with this modified {{SerializableCoder}}:
{code}
public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
    @SuppressWarnings("unchecked")
    Class<T> clazz = (Class<T>) type.getRawType();
    return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element class.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
    return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for
   * all types.
   *
   * <p>This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
    return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar {

    @Override
    public List<CoderProvider> getCoderProviders() {
      return ImmutableList.of(getCoderProvider());
    }
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
    @Override
    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
      if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
        return SerializableCoder.of((TypeDescriptor) typeDescriptor);
      }
      throw new CannotProvideCoderException(
          "Cannot provide SerializableCoder because " + typeDescriptor
              + " does not implement Serializable");
    }
  }

  private final Class<T> type;
  private transient TypeDescriptor<T> typeDescriptor;

  protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor) {
    this.type = type;
    this.typeDescriptor = typeDescriptor;
  }

  public Class<T> getRecordType() {
    return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
      throws IOException {
    ObjectOutputStream oos = new ObjectOutputStream(outStream);
    oos.writeObject(value);
    oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
      throws IOException, CoderException {
    try {
      ObjectInputStream ois = new ObjectInputStream(inStream);
      return type.cast(ois.readObject());
    } catch (ClassNotFoundException e) {
      throw new CoderException("unable to deserialize record", e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   *         deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
    throw new NonDeterministicException(this,
        "Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
    return !(other == null || getClass() != other.getClass())
            && type == ((SerializableCoder<?>) other).type;
  }

  @Override
  public int hashCode() {
    return type.hashCode();
  }

  @Override
  public TypeDescriptor<T> getEncodedTypeDescriptor() {
    if (typeDescriptor == null) {
      typeDescriptor = TypeDescriptor.of(type);
    }
    return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} anymore. I think this should fix the problem and if it indeed does we should include this change in Beam.


was (Author: aljoscha):
Could you try running it with this modified {{SerializableCoder}}:
{coder}
public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
    @SuppressWarnings("unchecked")
    Class<T> clazz = (Class<T>) type.getRawType();
    return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element class.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
    return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for
   * all types.
   *
   * <p>This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
    return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar {

    @Override
    public List<CoderProvider> getCoderProviders() {
      return ImmutableList.of(getCoderProvider());
    }
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
    @Override
    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
      if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
        return SerializableCoder.of((TypeDescriptor) typeDescriptor);
      }
      throw new CannotProvideCoderException(
          "Cannot provide SerializableCoder because " + typeDescriptor
              + " does not implement Serializable");
    }
  }

  private final Class<T> type;
  private transient TypeDescriptor<T> typeDescriptor;

  protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor) {
    this.type = type;
    this.typeDescriptor = typeDescriptor;
  }

  public Class<T> getRecordType() {
    return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
      throws IOException {
    ObjectOutputStream oos = new ObjectOutputStream(outStream);
    oos.writeObject(value);
    oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
      throws IOException, CoderException {
    try {
      ObjectInputStream ois = new ObjectInputStream(inStream);
      return type.cast(ois.readObject());
    } catch (ClassNotFoundException e) {
      throw new CoderException("unable to deserialize record", e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   *         deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
    throw new NonDeterministicException(this,
        "Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
    return !(other == null || getClass() != other.getClass())
            && type == ((SerializableCoder<?>) other).type;
  }

  @Override
  public int hashCode() {
    return type.hashCode();
  }

  @Override
  public TypeDescriptor<T> getEncodedTypeDescriptor() {
    if (typeDescriptor == null) {
      typeDescriptor = TypeDescriptor.of(type);
    }
    return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} anymore. I think this should fix the problem and if it indeed does we should include this change in Beam.

> Possible bug in Beam+Flink memory management, disk spillover
> ------------------------------------------------------------
>
>                 Key: BEAM-2831
>                 URL: https://issues.apache.org/jira/browse/BEAM-2831
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.0.0, 2.1.0
>         Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 10.12.6 and unknown Linux
>            Reporter: Reinier Kip
>            Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail.
> As [discussed on Flink's mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html] (stacktrace enclosed), Flink catches these EOFExceptions and activates disk spillover. Because Beam wraps these exceptions, this mechanism fails, the exception travels up the stack, and the job aborts.
> Hopefully this is enough information and this is something that can be adjusted for in Beam. I'd be glad to provide more information where needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)